Source code for kdiagram.utils.diagnose_q

#   License: Apache 2.0
#   Author: LKouadio <etanoyau@gmail.com>

"""
Diagnose and validating quantile-related operations.
Provides utilities for handling quantile data in various formats
and ensuring compatibility with expected structures.
"""

import operator
import re
import warnings
from collections.abc import Sequence
from typing import (
    Any,
    Optional,
    Union,
)

import numpy as np
import pandas as pd

from .validator import is_frame, is_in_if

__all__ = [
    "to_iterable",
    "validate_quantiles",
    "validate_quantiles_in",
    "validate_q_dict",
    "check_forecast_mode",
    "detect_quantiles_in",
    "build_q_column_names",
    "detect_digits",
    "validate_consistency_q",
    "parse_qcols",
    "validate_qcols",
    "build_qcols_multiple",
]


def parse_qcols(q_cols, fallback_cols=None, error="warn"):
    r"""
    parse_qcols is a utility function designed to interpret
    quantile column mappings from either a dictionary or list.
    It automatically identifies the lowest quantile, the median
    quantile (preferably 50 if available), and the highest
    quantile. The remaining quantiles, if any, are accessible
    through a parsed dictionary. This utility helps streamline
    the process of extracting quantile-based columns for later
    processing or plotting [1]_.

    Given a set of quantiles in :math:`q`, typically named
    like ``q10``, ``q50``, ``q90``, parse_qcols attempts to
    extract the numeric part of each name. For example:

    .. math::
       q_{key} = \\text{float}(\\text{key}[1:])

    If ``'q50'`` is found, it is treated as the median quantile.
    Otherwise, parse_qcols uses the central element of the
    sorted list of parsed quantiles. The minimum quantile
    becomes the "lowest" and the maximum becomes the "highest."

    Parameters
    ------------
    q_cols : dict or list, optional
        A collection of quantile definitions. If this
        parameter is a dictionary with keys like ``q10``,
        ``q50``, or ``q90``, the numeric portion is parsed.
        If it is a list, items are assigned dummy keys in
        ascending order (``q0``, ``q1``, etc.).
    fallback_cols : tuple of str, optional
        A 3-tuple (``lower_col``, ``median_col``,
        ``upper_col``) to return if the quantile parsing
        fails or if <parameter `q_cols`> is None.
    error : {'warn', 'raise', 'ignore'}, optional
        A function used to warn about parsing issues. If
        'raise', error raises rather than warning issues.

    Returns
    ---------
    dict
        A dictionary containing:
        - ``lowest_col``: The column name of the lowest
          quantile.
        - ``median_col``: The column name of the median
          quantile (preferably 50).
        - ``highest_col``: The column name of the highest
          quantile.
        - ``parsed_qvals``: A mapping of parsed quantile
          floats to their column names.
        - ``valid``: A boolean indicating whether valid
          quantiles were parsed.

    Notes
    -------
    By default, parse_qcols handles numeric quantile keys that
    begin with the letter 'q', followed by a valid float value
    (e.g., ``q10`` -> 10.0). Keys that cannot be converted into
    floats are ignored. If none are valid, parse_qcols returns
    values from <parameter `fallback_cols`>.

    Examples
    ----------
    >>> from kdiagram.utils.diagnose_q import parse_qcols
    >>> # Example dictionary
    >>> q_def = {'q10': 'low_10', 'q50': 'med_50', 'q90': 'hi_90'}
    >>> result = parse_qcols(q_def)
    >>> result['lowest_col']
    'low_10'


    References
    ------------
    .. [1] Doe, A., & Smith, J. (2021). Dynamic quantile
       extraction in large datasets. Journal of Data
       Diagnostics, 4(2), 101-110.
    """
    # Provide a default warning function if none is passed
    if fallback_cols is None:
        fallback_cols = (None, None, None)

    # Prepare output structure with defaults
    output = {
        "lowest_col": fallback_cols[0],
        "median_col": fallback_cols[1],
        "highest_col": fallback_cols[2],
        "parsed_qvals": {},
        "valid": False,
    }

    # If q_cols is not provided, return fallback immediately
    if not q_cols:
        return output

    if isinstance(q_cols, str):
        q_cols = [q_cols]
    # If q_cols is a list, convert to dict with dummy keys (q0, q1, etc.)
    if isinstance(q_cols, (list, tuple)):
        q_cols = {f"q{i}": col_name for i, col_name in enumerate(q_cols)}

    # Parse keys like 'q10', 'q50', 'q90' into numeric floats
    parsed = {}
    for k, col_name in q_cols.items():
        if not isinstance(k, str):
            msg = f"Quantile key '{k}' is not a string. Skipped."
            if error == "warn":
                warnings.warn(msg, stacklevel=2)
            elif error == "raise":
                raise TypeError(msg)
            continue
        if not k.startswith("q"):
            msg = f"Key '{k}' is not prefixed with 'q'. Skipped."
            if error == "warn":
                warnings.warn(msg, stacklevel=2)
            elif error == "raise":
                raise ValueError(msg)
            continue
        # Attempt to convert 'q10' -> 10.0
        try:
            q_val = float(k[1:])
            parsed[q_val] = col_name
        except ValueError as e:
            msg = f"Cannot parse quantile '{k}'. Skipped."
            if error == "warn":
                warnings.warn(msg, stacklevel=2)
            elif error == "raise":
                raise ValueError(msg) from e

    # If nothing valid was parsed, return fallback
    if not parsed:
        msg = (
            "No valid quantile columns found in `q_cols`. "
            "Falling back to explicit columns if provided."
        )
        if error == "warn":
            warnings.warn(msg, stacklevel=2)
        elif error == "raise":
            raise ValueError(msg)

        return output

    # Sort parsed q-values
    sorted_qvals = sorted(parsed.keys())
    output["parsed_qvals"] = parsed
    output["valid"] = True

    # The lowest quantile
    output["lowest_col"] = parsed[sorted_qvals[0]]
    # The highest quantile
    output["highest_col"] = parsed[sorted_qvals[-1]]

    # For median, prefer '50' if it exists, else pick middle
    if 50.0 in parsed:
        output["median_col"] = parsed[50.0]
    else:
        mid_idx = len(sorted_qvals) // 2
        output["median_col"] = parsed[sorted_qvals[mid_idx]]

    return output


def check_forecast_mode(
    mode, q=None, error="raise", ops="validate", q_mode="strict", **kw
):
    r"""
    Check consistency between forecast mode and quantile values.

    This function verifies that the provided forecast `mode`
    is consistent with the quantile values (`q`). If the mode
    is ``"point"`` and quantile values are provided, it will
    either warn the user and reset `q` to ``None`` (if
    ``error=="warn"``) or raise a ValueError (if
    ``error=="raise"``). Similarly, if the mode is
    ``"quantile"`` and no quantile values are provided, it will
    either warn the user and set `q` to the default values
    ``[0.1, 0.5, 0.9]`` (if ``error=="warn"``) or raise a
    ValueError (if ``error=="raise"``).

    Additionally, if ``ops`` is set to ``"check_only"``, the function
    only performs the checks without modifying or returning ``q``.

    Parameters
    ------------
    mode : str
        Forecast mode, either ``"point"`` or
        ``"quantile"``.
    q : list of float, optional
        List of quantile values. Defaults to ``None``.
    error : str, optional
        Error handling behavior. If set to ``"raise"``, a
        ValueError is raised when an inconsistency is
        detected. If set to ``"warn"``, a warning is issued
        and a default behavior is applied.
    ops : str, optional
        Operation mode. If set to ``"check_only"``, the function only
        performs the checks without returning any value. If set to
        ``"validate"``, the function returns the validated
       (or updated) quantile values. Default is ``"validate"``.
    q_mode: {'strict', 'soft'}
       Validation quantiles mode. See more in
       :func:`kdiagram.utils.diagnose_q.validate_quantiles`
    *kw: dict,
        Additional keywords argument of :func:`kdiagram.utils.diagnose_q`.

    Returns
    ---------
    q : list of float or None
        The validated (or updated) quantile values if ``ops`` is
        ``"validate"``; otherwise, returns ``None``.

    Raises
    --------
    ValueError
        If an inconsistency is detected and ``error`` is set to
        ``"raise"``.

    Examples
    ----------
    >>> from gofast.utils.diagnose_q impor check_forecast_mode
    >>> check_forecast_mode("point", q=[0.1, 0.5, 0.9])
    # Raises a ValueError or warns and returns None based on the error flag.

    >>> check_forecast_mode("quantile", q=None, error="warn")
    # Issues a warning and returns [0.1, 0.5, 0.9].

    """

    # Ensure mode is valid.
    if mode not in ["point", "quantile"]:
        raise ValueError("mode must be either 'point' or 'quantile'.")

    # Handle the case for "point" mode.
    if mode == "point":
        if q is not None:
            msg = (
                "In point mode, quantile values (q) should be None. "
                "Resetting q to None."
            )
            if error == "warn":
                warnings.warn(msg, stacklevel=2)

            elif error == "raise":
                raise ValueError(msg)

            q = None
    # Handle the case for "quantile" mode.
    elif mode == "quantile":
        if q is None:
            msg = (
                "In quantile mode, quantile values (q) must be provided. "
                "Setting default quantiles to [0.1, 0.5, 0.9]."
            )
            if error == "warn":
                warnings.warn(msg, stacklevel=2)

            elif error == "raise":
                raise ValueError(msg)

            q = [0.1, 0.5, 0.9]
        # then validate quantiles
        q = validate_quantiles(q, mode=q_mode, **kw)
    # If ops is "check_only", simply return None.
    if ops == "check_only":
        return None
    else:
        return q


def to_iterable(
    obj: Any,
    exclude_string: bool = False,
    transform: bool = False,
    parse_string: bool = False,
    flatten: bool = False,
    unique: bool = False,
    delimiter: str = r"[ ,;|\t\n]+",
) -> Union[bool, list[Any]]:
    r"""
    Determines if an object is iterable, with options to transform, parse,
    and modify the input for flexible iterable handling.

    Parameters
    ------------
    obj : Any
        Object to be evaluated or transformed into an iterable.
    exclude_string : bool, default=False
        Excludes strings from being considered as iterable objects.
    transform : bool, default=False
        Transforms `obj` into an iterable if it isn't already. Defaults to
        wrapping `obj` in a list.
    parse_string : bool, default=False
        If `obj` is a string, splits it into a list based on the specified
        `delimiter`. Requires `transform=True`.
    flatten : bool, default=False
        If `obj` is a nested iterable, flattens it into a single list.
    unique : bool, default=False
        Ensures unique elements in the output if `transform=True`.
    delimiter : str, default=r'[ ,;|\t\n]+'
        Regular expression pattern for splitting strings when `parse_string=True`.

    Returns
    ---------
    bool or List[Any]
        Returns a boolean if `transform=False`, or an iterable if
        `transform=True`.

    Raises
    --------
    ValueError
        If `parse_string=True` without `transform=True`, or if `delimiter`
        is invalid.

    Notes
    -------
    - When `parse_string` is used, strings are split by `delimiter` to form a
      list of substrings.
    - `flatten` and `unique` apply only when `transform=True`.
    - Using `unique=True` ensures no duplicate values in the output.

    Examples
    ----------
    >>> from kdiagram.utils.diagnose_q import to_iterable
    >>> to_iterable("word", exclude_string=True)
    False

    >>> to_iterable(123, transform=True)
    [123]

    >>> to_iterable("parse, this sentence", transform=True, parse_string=True)
    ['parse', 'this', 'sentence']

    >>> to_iterable([1, [2, 3], [4]], transform=True, flatten=True)
    [1, 2, 3, 4]

    >>> to_iterable("a,b,a,b", transform=True, parse_string=True, unique=True)
    ['a', 'b']
    """
    if parse_string and not transform:
        raise ValueError(
            "Set 'transform=True' when using 'parse_string=True'."
        )

    # Check if object is iterable (excluding strings if specified)
    is_iterable = hasattr(obj, "__iter__") and not (
        exclude_string and isinstance(obj, str)
    )

    # If transformation is not needed, return the boolean check
    if not transform:
        return is_iterable

    # If string parsing is enabled and obj is a string, split it using delimiter
    if isinstance(obj, str) and parse_string:
        obj = re.split(delimiter, obj.strip())

    # Wrap non-iterables into a list if they aren't iterable
    elif not is_iterable:
        obj = [obj]

    # Flatten nested iterables if flatten=True
    if flatten:
        obj = _flatten(obj)

    # Apply unique filtering if requested
    if unique:
        obj = list(
            dict.fromkeys(obj)
        )  # Preserves order while ensuring uniqueness

    return obj


def _flatten(nested_list: Any) -> list[Any]:
    """Helper function to recursively flatten a nested list structure."""
    flattened = []
    for element in nested_list:
        if isinstance(element, (list, tuple, set)):
            flattened.extend(_flatten(element))
        else:
            flattened.append(element)
    return flattened


def validate_q_dict(q_dict, recheck=False):
    r"""
     Converts the keys of a dictionary of quantile columns (`q_dict`) from
     string representations to numeric values (float) if possible. If the key
     cannot be converted to a number, it returns the dictionary as is.

     Optionally validates the quantiles after conversion to ensure that all
     keys are within the valid range of quantiles [0, 1].

     Parameters
     ------------
     q_dict : dict
         A dictionary where the keys represent quantiles (either as
         strings like '0.1' or '10%') and the values are lists of
         column names associated with those quantiles.

     recheck : bool, optional, default=False
         If `True`, the keys of the dictionary will be validated as
         quantiles after the conversion. This validation checks that
         all keys lie within the range [0, 1].

     Returns
     ---------
     dict
         A dictionary with numeric keys if conversion is successful,
         otherwise the original dictionary. The keys are either floats
         representing quantiles or the original keys if they cannot
         be converted.

     Notes
     -------
     The function performs the following steps:

     1. Iterates over the dictionary to check whether each key can be
        converted to a numeric value.
     2. If a key is a string containing a percentage (e.g., '10%'),
        it removes the '%' sign and divides the value by 100 to
        convert it to a float.
     3. If a key can be successfully converted, it is stored as a
        floating-point number in the resulting dictionary.
     4. If the conversion fails (due to a `ValueError`, `TypeError`, or
        `AttributeError`), the original key is retained in the dictionary.
     5. If `recheck` is `True`, it validates the converted quantiles by
        ensuring they are in the range [0, 1].

     The function is designed to handle both direct float conversions (e.g.,
     '0.1') and percentage-based representations (e.g., '10%' becomes 0.1).

     Example
     ---------
     >>> q_dict = {'0.1': ['subsidence_q10'], '50%': ['subsidence_q50'],
                   '90%': ['subsidence_q90']}
     >>> validate_q_dict(q_dict)
     {0.1: ['subsidence_q10'], 0.5: ['subsidence_q50'], 0.9: ['subsidence_q90']}

     >>> q_dict = {'0.1': ['subsidence_q10'], '0.5': ['subsidence_q50'],
                   '0.9': ['subsidence_q90']}
     >>> validate_q_dict(q_dict)
     {'0.1': ['subsidence_q10'], 0.5: ['subsidence_q50'], 'high': ['subsidence_q90']}

     >>> q_dict = {'0.1': ['subsidence_q10'], '200%': ['subsidence_q200']}
     >>> validate_q_dict(q_dict, recheck=True)
     {0.1: ['subsidence_q10'], 2.0: ['subsidence_q200']}

     See Also
     ----------
    validate_quantiles`:
        Validates if the values are valid quantiles in the range [0, 1].

     References
     ------------
     .. [1] Hyndman, R. J., & Fan, Y. (1996). Sample quantiles in
            statistical packages. The American Statistician, 50(4), 361-365.
     .. [2] Weiss, N. A. (2015). Introductory Statistics. Pearson.
    """
    # Initialize an empty dictionary to store the converted quantiles
    new_q_cols = {}

    if not isinstance(q_dict, dict):
        raise TypeError(
            f"Expected a dictionary for `q_dict`, but "
            f"got {type(q_dict).__name__}. Ensure that `q_dict`"
            " is a dictionary where the keys represent quantiles "
            "(either as strings like '0.1' or '10%') and the values"
            " are lists of column names."
        )
    for key, value in q_dict.items():
        try:
            # Check if the key contains a percentage sign and convert it
            if "%" in str(key):
                key_float = float(key.replace("%", "")) / 100.0
            else:
                # Directly convert the key to float
                key_float = float(key)

            new_q_cols[key_float] = value

        except (AttributeError, ValueError, TypeError):
            # If conversion fails, retain the original key
            new_q_cols[key] = value

    if recheck:
        # Validate the quantiles after conversion
        # (i.e., ensure keys are between 0 and 1)
        validate_quantiles(list(new_q_cols.keys()), dtype="float64")

    return new_q_cols


def validate_quantiles(
    quantiles,
    asarray=False,
    round_digits=2,
    dtype=None,
    mode="strict",
    scale_method="uniform",
):
    r"""
    Validate and normalize quantile values with flexible conversion rules.

    Ensures quantile inputs are valid probabilities :math:`q \in [0,1]` while
    providing mechanisms for automatic value adjustment through different
    scaling strategies [1]_.

    .. math::

        Q_{\text{adj}} = \frac{q_{\text{raw}}}{10^{\lfloor \log_{10}(q_{\text{raw}}) \rfloor + 1}}

    where :math:`q_{\text{raw}}` is the input value requiring adjustment.

    Parameters
    ------------
    quantiles : array-like
        Input values to validate. Accepts:
        - Numeric values in [0,1]
        - Percentages (e.g., "20%")
        - Integers for automatic scaling in ``mode='soft'``
    asarray : bool, default=False
        Determines output format:
        - ``True``: Returns numpy array
        - ``False``: Returns Python list
    round_digits : int, default=2
        Number of decimal places for rounding to mitigate floating-point
        precision issues
    dtype : str or numpy.dtype, default='float32'
        Output data type. Supported values: 'float32' (TF-compatible) or
        'float64' (high precision)
    mode : {'strict', 'soft'}, default='strict'
        Validation strictness:
        - ``'strict'``: Rejects values outside [0,1]
        - ``'soft'``: Converts percentages and scales integers using
           ``scale_method``
    scale_method : {'uniform', 'individual'}, default='uniform'
        Scaling strategy for ``mode='soft'``:
        - ``'uniform'``: Uses maximum digit count from all values for divisor
        - ``'individual'``: Scales each value independently

    Returns
    ---------
    list or numpy.ndarray
        Validated quantiles in specified format. Return type matches
        ``asarray`` parameter.

    Raises
    ------
    TypeError
        For non-numeric inputs in ``mode='strict'`` or invalid types in
        ``mode='soft'``
    ValueError
        For values outside [0,1] in ``mode='strict'`` or invalid scaling
        conversions

    Examples
    -----------
    Basic validation:
    >>> from kdiagram.utils.diagnose_q import validate_quantiles
    >>> validate_quantiles([0.1, 0.5, 0.9])
    [0.1, 0.5, 0.9]

    Soft mode with percentage conversion:
    >>> validate_quantiles(["20%", 5, 150], mode='soft')
    [0.2, 0.05, 0.15]

    Array output with custom precision:
    >>> validate_quantiles([0.123456, 0.789012], asarray=True, round_digits=3)
    array([0.123, 0.789], dtype=float32)

    Notes
    --------
    1. In ``mode='soft'``:
       - Percentages convert via :math:`\frac{\text{value}}{100}`
       - Integer scaling uses:
         - Uniform: :math:`\frac{\text{value}}{10^{\text{max\_digits}}}`
         - Individual: :math:`\frac{\text{value}}{10^{\text{self\_digits}}}`

    2. Rounding follows banker's rounding (numpy.round behavior) to minimize
       cumulative errors [2]_.

    See Also
    ----------
    gofast.stats.evaluate_quantiles : Evaluates quantile estimation accuracy
    numpy.quantile : Computes quantiles of array values

    References
    ------------
    .. [1] IEEE Standard for Floating-Point Arithmetic. IEEE Std 754-2019.

    .. [2] Hyndman, R.J. & Fan, Y. (1996). Sample Quantiles in Statistical
           Packages. The American Statistician, 50(4), 361-365.
    """
    quantiles = to_iterable(quantiles, transform=True, flatten=True)

    if mode == "soft":
        quantiles = _process_soft_quantiles(
            quantiles, scale_method=scale_method
        )

    if not isinstance(quantiles, (list, np.ndarray)):
        raise TypeError(
            "Quantiles must be list or numpy array. "
            f"Received {type(quantiles).__name__}."
        )

    dtype = _get_valid_dtype(dtype)
    quantiles_np = np.array(quantiles, dtype=dtype)

    _validate_quantile_values(quantiles_np)
    quantiles_np = np.round(quantiles_np, decimals=round_digits)

    return quantiles_np if asarray else quantiles_np.tolist()


def _process_soft_quantiles(quantiles, scale_method):
    """Process quantiles in soft mode with scaling adjustments."""
    scaled_values = []
    scale_candidates = []

    for q in quantiles:
        q_val, needs_scaling = _process_single_quantile(q)

        if needs_scaling:
            scale_candidates.append(q_val)
            scaled_values.append(None)
        else:
            scaled_values.append(q_val)

    if scale_candidates:
        scaled = _apply_scaling(scale_candidates, scale_method=scale_method)
        scaled_values = _merge_scaled_values(scaled_values, scaled)

    return scaled_values


def _process_single_quantile(q):
    """Process individual quantile value for soft mode."""
    original = q
    q = _convert_string_quantile(q)

    if not isinstance(q, (int, float)):
        raise TypeError(
            f"Quantile {original} must be numeric. "
            f"Received {type(q).__name__}."
        )

    if q < 0:
        raise ValueError(f"Negative quantile value: {original}")

    if 0 <= q <= 1:
        return q, False

    if not np.isclose(q, int(q)):
        raise ValueError(f"Non-integer out-of-range quantile: {original}")

    return int(q), True


def _convert_string_quantile(q):
    """Convert string quantiles to numeric values."""
    if isinstance(q, str):
        q = q.strip().rstrip("%")
        try:
            value = float(q)
            if "%" in q:
                value /= 100.0
            return value
        except ValueError:
            raise ValueError(
                f"Could not convert string quantile: {q}"
            ) from None
    return q


def _apply_scaling(scale_candidates, scale_method):
    """Apply scaling strategy to out-of-range quantiles."""
    if scale_method == "uniform":
        max_digits = max(len(str(q)) for q in scale_candidates)
        divisor = 10**max_digits
        return [q / divisor for q in scale_candidates]

    if scale_method == "individual":
        return [q / (10 ** len(str(q))) for q in scale_candidates]

    raise ValueError(
        f"Invalid scale_method: {scale_method}. "
        "Choose 'uniform' or 'individual'."
    )


def _merge_scaled_values(values, scaled):
    """Merge scaled values back into original quantile list."""
    result = []
    scale_idx = 0

    for val in values:
        if val is None:
            result.append(scaled[scale_idx])
            scale_idx += 1
        else:
            result.append(val)

    return result


def _get_valid_dtype(dtype):
    """Validate and return proper numpy dtype."""
    dtype_map = {"float32": np.float32, "float64": np.float64}

    if dtype is None:
        return np.float32

    if isinstance(dtype, str) and dtype in dtype_map:
        return dtype_map[dtype]

    return dtype if dtype in (np.float32, np.float64) else np.float32


def _validate_quantile_values(quantiles_np):
    """Core validation for quantile value requirements."""
    if not np.issubdtype(quantiles_np.dtype, np.number):
        raise ValueError("All quantiles must be numeric.")

    if np.any((quantiles_np < 0) | (quantiles_np > 1)):
        raise ValueError(
            "Quantiles must be in [0, 1] range. "
            "Use 'soft' mode for automatic scaling."
        )


def validate_quantiles_in(
    quantiles,
    asarray=False,
    round_digits=1,
    dtype=None,
    mode="strict",
):
    r"""
    Validates the input quantiles and optionally returns the output as a
    numpy array or list, with an option to round the quantiles to a
    specified number of decimal places to avoid floating-point precision
    issues.

    Quantiles are numerical values used in statistical analysis to
    divide a distribution into intervals. They must lie within the
    range [0, 1] as they represent proportions of data [1]_.

    Parameters
    ----------
    quantiles : list or numpy.ndarray
        Input array-like containing quantile values to be validated.
        The values must be numeric and within the range [0, 1].

    asarray : bool, optional
        Determines the output format. If `True`, the validated
        quantiles are returned as a numpy array. If `False`, they
        are returned as a list. Default is `False`.

    round_digits : int, optional, default=1
        The number of decimal places to which the quantiles should be
        rounded. This helps avoid floating-point precision errors such as
        `0.10000000149011612` being displayed as `0.1`. By default,
        quantiles are rounded to 1 decimal place.

    dtype : numpy.dtype, optional, default=np.float32
        The data type for the quantiles array. Use `np.float32`
        for compatibility with TensorFlow or `np.float64` for higher
        precision. The dtype determines the precision used for quantiles
        during validation and rounding.

    Returns
    -------
    list or numpy.ndarray
        A list or numpy array of validated quantile values, depending
        on the value of `asarray`.

    Raises
    ------
    TypeError
        If the input `quantiles` is not a list or numpy array.

    ValueError
        If any element of `quantiles` is not numeric or lies outside
        the range [0, 1].

    Notes
    -----
    Quantiles, denoted as :math:`q \in [0, 1]`, represent the fraction
    of observations below a certain value in a distribution:

    .. math::

        Q(q) = \inf \{ x \in \mathbb{R} : P(X \leq x) \geq q \}

    where :math:`Q(q)` is the quantile function, and :math:`q` is the
    proportion [2]_.

    This function ensures that all values in `quantiles` adhere to
    this definition by checking:

    1. The type of `quantiles`.
    2. The numerical nature of its elements.
    3. The range of its values.
    4. The optional rounding of the quantiles to a specified number
       of decimal places.

    Examples
    --------
    >>> from gofast.utils.diagnose_q import validate_quantiles_in
    >>> validate_quantiles([0.1, 0.2, 0.5])
    [0.1, 0.2, 0.5]

    >>> validate_quantiles(np.array([0.3, 0.7, 0.9]), asarray=True)
    array([0.3, 0.7, 0.9])

    >>> validate_quantiles([0.10000000149011612, 0.5, 0.8999999761581421], round_digits=1)
    [0.1, 0.5, 0.9]

    >>> validate_quantiles([0.5, 1.2])
    ValueError: All quantile values must be in the range [0, 1].

    See Also
    --------
    numpy.percentile : Computes the nth percentile of an array.
    numpy.quantile : Computes the qth quantile of an array.

    References
    ----------
    .. [1] Hyndman, R. J., & Fan, Y. (1996). Sample quantiles in
           statistical packages. The American Statistician, 50(4), 361-365.
    .. [2] Weiss, N. A. (2015). Introductory Statistics. Pearson.

    """
    # Convert quantiles to a list if necessary
    quantiles = to_iterable(quantiles, transform=True, flatten=True)

    # Validate input type: must be list or numpy array
    if not isinstance(quantiles, (list, np.ndarray)):
        raise TypeError(
            "Quantiles must be a list or numpy array. Received "
            f"{type(quantiles).__name__!r}."
        )

    # Define a dictionary for mapping string dtype names to numpy float types
    dtypes = {"float32": np.float32, "float64": np.float64}

    # Check if dtype is a string, and convert
    # it to the corresponding numpy dtype
    if dtype is None:
        dtype = "float32"
    if isinstance(dtype, str):
        if dtype not in dtypes:
            raise ValueError(
                f"Unsupported dtype string: {dtype}."
                " Supported values are 'float32' or 'float64'."
            )
        # Convert string to corresponding numpy dtype
        dtype = dtypes[dtype]

    # Convert input to numpy array for consistent
    # processing using the specified dtype
    quantiles = np.array(quantiles, dtype=dtype)

    # Validate that all elements are numeric
    if not np.issubdtype(quantiles.dtype, np.number):
        raise ValueError("All quantile values must be numeric.")

    # Validate that all values are within the range [0, 1]
    if not np.all((quantiles >= 0) & (quantiles <= 1)):
        raise ValueError("All quantile values must be in the range [0, 1].")

    # Round quantiles to the specified number of decimal places
    quantiles = np.round(quantiles, decimals=round_digits)

    # Return quantiles in the desired format
    return quantiles if asarray else quantiles.tolist()


[docs] def detect_quantiles_in( df: pd.DataFrame, col_prefix: Optional[str] = None, dt_value: Optional[list[str]] = None, mode: str = "soft", return_types: str = "columns", verbose: int = 0, ) -> Union[list[str], list[float], list[np.ndarray], pd.DataFrame, None]: r""" Detect quantile columns in a DataFrame using naming patterns and value validation. Identifies columns containing quantile data through structured naming conventions and value validation [1]_. Supports both absolute and normalized quantile representations through mode-based value adjustment [2]_. Parameters ---------- df : pd.DataFrame Input DataFrame containing potential quantile columns. Column names must be strings. col_prefix : str, optional Column name prefix for targeted search (e.g., ``'price'`` for ``price_q0.25``). If None, scans all columns. dt_value : list of str, optional Date filters for temporal quantile detection (e.g., ``['2023']`` matches columns like ``price_2023_q0.5``). mode : {'soft', 'strict'}, default='soft' Value handling strategy: - ``'soft'``: Normalizes values >1 to 1.0 using min-max scaling - ``'strict'``: Excludes values outside [0,1] range return_types : {'columns', 'q_val', 'values', 'frame'}, default='columns' Return format specification: - ``'columns'``: List of column names - ``'q_val'``: Sorted unique quantile values - ``'values'``: Column data arrays - ``'frame'``: DataFrame subset verbose : {0, 1, 2, 3}, default=0 Output verbosity: - 0: Silent - 1: Basic scan info - 2: Per-column matches - 3: Full diagnostic output Returns ------- Union[List[str], List[float], List[np.ndarray], pd.DataFrame, None] Quantile data in format specified by ``return_types``. Returns None if no quantiles detected. Notes ----- The detection adjustment can be formulated as : .. math:: q_{\text{adj}} = \begin{cases} \min(1, \max(0, q_{\text{raw}})) & \text{if } mode=\text{'soft'} \\ q_{\text{raw}} & \text{if } q \in [0,1] \text{ and } mode=\text{'strict'} \end{cases} 1. Column name pattern requirements: - Requires ``_qX`` suffix where X is numeric - Temporal format: ``{prefix}_{date}_q{value}`` - Non-temporal format: ``{prefix}_q{value}`` 2. Value adjustment in soft mode uses piecewise function: - Clips values to [0,1] range - Preserves original values within valid range Examples -------- >>> from kdiagram.utils.diagnose_q import detect_quantiles_in >>> import pandas as pd >>> >>> # Basic detection >>> df = pd.DataFrame({'sales_q0.25': [4.2], 'sales_q0.75': [5.8]}) >>> detect_quantiles_in(df, col_prefix='sales') ['sales_q0.25', 'sales_q0.75'] >>> >>> # Temporal quantile filtering >>> df = pd.DataFrame({'temp_2023_q0.5': [22.1], 'temp_2024_q0.5': [23.4]}) >>> detect_quantiles_in(df, dt_value=['2023'], return_types='q_val') [0.5] >>> >>> # Value normalization >>> df = pd.DataFrame({'risk_q150': [0.8]}) >>> detect_quantiles_in(df, mode='soft', return_types='q_val') [1.0] See Also -------- kdiagram.utils.validate_quantiles : For quantile value validation pandas.DataFrame.filter : For column selection by pattern References ---------- .. [1] Regular Expression HOWTO, Python Documentation .. [2] Pandas API Reference: DataFrame operations """ is_frame(df, df_only=True, objname="Data 'df'") df.columns = df.columns.astype(str) col_prefix = col_prefix or "" quantile_columns = [] found_quantiles = set() _log_verbose( f"Scanning DataFrame columns with prefix: {col_prefix}", verbose, 1 ) for col in df.columns: result = _process_column(col, df, col_prefix, dt_value, mode, verbose) if result: q_val, col_data = result found_quantiles.add(q_val) _store_results( q_val, col_data, quantile_columns, return_types, col ) _log_verbose(f"Quantiles detected: {sorted(found_quantiles)}", verbose, 3) return _format_output(quantile_columns, found_quantiles, return_types, df)
def _process_column( col: str, df: pd.DataFrame, prefix: str, dt_values: Optional[list[str]], mode: str, verbose: int, ) -> Optional[tuple]: """Process individual column for quantile detection.""" # Handle both cases: with or without prefix if prefix: # Remove the prefix part if col.startswith(f"{prefix}_"): col_match = col[len(prefix) + 1 :] # Remove prefix and underscore else: return ( None # If the column does not start with the prefix, skip it ) else: col_match = col # No prefix, use the column name directly match, q_str = _check_column_match(col_match, dt_values) if not match: return None try: q_val = _extract_quantile_value(q_str, mode) except ValueError as e: _log_verbose(f"Invalid quantile value in {col}: {e}", verbose, 2) return None _log_verbose( f"Found quantile match: {col} with value: {q_val}", verbose, 2 ) return q_val, df[col].values def _check_column_match( remainder: str, dt_values: Optional[list[str]], prefix: Optional[str] = None, ) -> tuple: """Check if column remainder matches date and quantile patterns.""" # Match quantile pattern like q0.25 quantile_pattern = re.compile(r"q([\d\.]+)$") # Case 1: If a date filter is provided, look for # columns matching the date and quantile patterns if dt_values: for d_str in dt_values: date_pattern = f"^{d_str}_q" # Include date check # Match column with date filter if remainder.startswith(date_pattern): m = quantile_pattern.search(remainder) return (True, m.group(1)) if m else (False, None) # Case 2: If no date filter is provided, # look for quantile matches else: # Check for the quantile pattern in the remainder m = quantile_pattern.search(remainder) if m: # Return the quantile value (e.g., 0.25) return (True, m.group(1)) return (False, None) def _extract_quantile_value(q_str: str, mode: str) -> float: """Extract and validate quantile value with proper error handling.""" try: q_val = float(q_str) except ValueError as e: raise ValueError(f"Invalid quantile format: {q_str}") from e # Use centralized validation from validate_quantiles validated = validate_quantiles( [q_val], mode=mode, scale_method="individual", round_digits=2, dtype=np.float64, ) return validated[0] def _store_results( q_val: float, col_data: np.ndarray, quantile_columns: list, return_types: str, col: str, ) -> None: """Store results based on requested return type.""" if return_types == "values": quantile_columns.append(col_data) elif return_types == "q_val": quantile_columns.append(q_val) else: quantile_columns.append(col) def _format_output( quantile_columns: list, found_quantiles: set, return_types: str, df: pd.DataFrame, ) -> Union[list, pd.DataFrame, None]: """Format final output based on return_types.""" if not quantile_columns: return None if return_types == "frame": return df[quantile_columns] if return_types == "q_val": return sorted(found_quantiles) if return_types == "values": return np.vstack(quantile_columns) if quantile_columns else [] return sorted(quantile_columns) def _log_verbose( message: str, verbose_level: int, required_level: int ) -> None: """Centralized verbose logging control.""" if verbose_level >= required_level: print(message)
[docs] def build_q_column_names( df: pd.DataFrame, quantiles: list[Union[float, str]], value_prefix: Optional[str] = None, dt_value: Optional[list[Union[str, int]]] = None, strict_match: bool = True, ) -> list[str]: r""" Generate and validate quantile column names following naming conventions. Parameters ---------- df : pd.DataFrame Target DataFrame containing potential quantile columns quantiles : list of float/str Quantile values to search for (0 < q < 1). Accepts: - Float values (e.g., 0.25) - String representations (e.g., "25%") value_prefix : str, optional Column name prefix for structured naming. If None, looks for unprefixed columns. dt_value : list of str/int, optional Temporal identifiers for time-aware quantiles. Converts all values to strings. strict_match : bool, default=True Matching strategy: - ``True``: Requires exact column name matches - ``False``: Uses regex pattern matching for flexible detection Returns ------- list Valid column names found in the DataFrame matching the quantile naming pattern. Notes ----- Constructs column names using the pattern: .. math:: \text{col_name} = \begin{cases} \text{value_prefix}\_\text{date}\_q\text{quantile} & \text{if both prefix and date exist} \\ \text{value_prefix}\_q\text{quantile} & \text{if only prefix exists} \\ \text{date}\_q\text{quantile} & \text{if only date exists} \\ q\text{quantile} & \text{otherwise} \end{cases} Examples -------- >>> from kdiagram.utils.diagnose_q import build_q_column_names >>> import pandas as pd >>> >>> # Basic usage with prefix >>> df = pd.DataFrame(columns=['price_q0.25', 'price_2023_q0.5']) >>> build_q_column_names(df, [0.25, 0.5], 'price') >>> >>> # if strict_match ts ['price_q0.25', 'price_2023_q0.5'] >>> >>> # Date-filtered search >>> build_q_column_names(df, [0.5], 'price', dt_value=['2023']) ['price_2023_q0.5'] >>> >>> # Unprefixed columns >>> df = pd.DataFrame(columns=['q0.75', '2024_q0.9']) >>> build_q_column_names(df, [0.75, 0.9]) ['q0.75', '2024_q0.9'] See Also -------- kdiagram.utils.diagnose_q.validate_quantiles : For quantile value validation pandas.Series.str.contains : For column pattern matching """ is_frame(df, df_only=True, objname="Data 'df'") # Validate and normalize inputs valid_quantiles = validate_quantiles( quantiles, mode="soft", round_digits=2, dtype="float64", ) date_strings = _process_dt_values(dt_value) df.columns = df.columns.astype(str) if strict_match: candidates = _generate_strict_candidates( valid_quantiles, value_prefix, date_strings ) return [col for col in candidates if col in df.columns] # Flexible pattern matching pattern = _build_flexible_pattern( valid_quantiles, value_prefix, date_strings ) return [col for col in df.columns if pattern.search(col)]
def _generate_strict_candidates( quantiles: list[float], prefix: Optional[str], dates: list[str] ) -> list[str]: """Generate exact match candidates in all valid formats.""" candidates = [] for q in quantiles: # Decimal format (q0.25) dec_str = f"q{q:.4f}".rstrip("0").rstrip(".") # Percentage format (q25) pct_str = f"q{int(round(q * 100))}" for fmt in [dec_str, pct_str]: # Temporal candidates if dates: candidates.extend( f"{prefix}_{d}_{fmt}" if prefix else f"{d}_{fmt}" for d in dates ) # Non-temporal candidates candidates.append(f"{prefix}_{fmt}" if prefix else fmt) return list(set(candidates)) # Remove duplicates def _build_flexible_pattern( quantiles: list[float], prefix: Optional[str], dates: list[str] ) -> re.Pattern: """Build regex pattern for flexible quantile matching.""" # Quantile alternatives (0.25|25) q_alternatives = "|".join( f"{q:.4f}".rstrip("0").rstrip(".") + "|" + str(int(round(q * 100))) for q in quantiles ) # Prefix component prefix_part = f"{re.escape(prefix)}_?" if prefix else "" # Date component date_part = ( f"({'|'.join(map(re.escape, dates))})_+" if dates else r"\d{4}_?|" ) return re.compile( rf"^{prefix_part}(?:{date_part})?q({q_alternatives})\b", flags=re.IGNORECASE, ) def _process_dt_values( dt_values: Optional[list[Union[str, int]]], ) -> list[str]: """Normalize temporal values to standardized strings.""" return [str(v).strip() for v in dt_values] if dt_values else [] def detect_digits( value, pattern: str = None, as_q: bool = False, return_unique: bool = False, sort: bool = False, error: str = "ignore", verbose: int = 0, ) -> list: r""" Detect numeric values in a string or list of strings. This function extracts numeric values from the input by applying a robust regular expression. When used in quantile mode (i.e., when ``as_q`` is True), it captures numbers that appear immediately after the substring ``_q`` and before either ``_step`` or the end-of-string. In general mode (when ``as_q`` is False), it uses a conventional digit detector. The extracted numeric values are converted to floats. .. math:: \text{Extracted Value} = \text{value after } `\_q` \text{ and before } (\texttt{\_step} \text{ or end-of-string}) Parameters ---------- value : Union[str, List[str]] A string or a list of strings from which to extract numeric values. pattern : str, optional A custom regular expression pattern. If ``None``, the default is: - If ``as_q`` is True: ``"(?<=_q)(\\d+(?:\\.\\d+)?)(?=(_step|$))"``. - Otherwise: ``(?<!\d)(\d+(?:\.\d+)?)(?!\d)""``. as_q : bool, optional If True, converts each detected numeric value to a quantile value using soft mode via ``validate_quantiles``. Default is False. return_unique : bool, optional If True, returns only unique detected values. Default is False. sort : bool, optional If True, returns the detected numbers in ascending order. Default is False. error : str, optional Specifies how to handle conversion errors. Options are: ``"raise"`` to throw a ValueError, ``"warn"`` to print a warning message, or ``"ignore"`` to skip invalid matches. Default is ``"ignore"``. verbose : int, optional Verbosity level for debugging output. Higher values (e.g., 5 or above) produce more detailed logs. Default is 0. Returns ------- list A list of numeric values (floats) extracted from the input. If ``as_q`` is True, these values are converted to quantile values in soft mode. Examples -------- >>> from kdiagram.utils.diagnose_q import detect_digits >>> # Single string example: >>> detect_digits("subsidence_q10_step1") [10.0] >>> # List of strings: >>> detect_digits(["subsidence_q10_step1", ... "subsidence_q50_step1", ... "subsidence_q89_step1"]) [10.0, 50.0, 89.0] >>> # With conversion to quantile (soft mode): >>> detect_digits("subsidence_q10.5_step1", as_q=True) [0.105] # Example: converts 10.5 to 0.105 in soft mode. Notes ----- - The default regex pattern for quantile mode employs lookbehind and lookahead assertions to ensure that the numeric value is immediately preceded by ``_q`` and followed by ``_step`` or the end-of-string. - When ``as_q`` is False, a more general digit detection regex is used. - Input that is not a list is automatically converted to a list of strings. - The ``error`` parameter controls whether conversion issues raise an exception, warn the user, or are silently ignored. See Also -------- validate_quantiles : Converts numeric values to quantile values in soft mode. References ---------- .. [1] Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). *Introduction to Algorithms* (3rd ed.). MIT Press. .. [2] Aho, A. V., Lam, M. S., Sethi, R., & Ullman, J. D. (2006). *Compilers: Principles, Techniques, and Tools* (2nd ed.). Pearson. """ # If no custom regex pattern is provided, select a default pattern. if pattern is None: if as_q: # Use a pattern to capture numbers after '_q' # and before '_step' or end-of-string. pattern = r"(?<=_q)(\d+(?:\.\d+)?)(?=(_step|$))" else: # General robust digit detection using word boundaries. pattern = r"(?<!\d)(\d+(?:\.\d+)?)(?!\d)" # #r"(?<!\d)(\d+)(?!\d)" #r"\b\d+(?:\.\d+)?\b" # Compile the regex pattern. regex = re.compile(pattern) # Ensure the input is treated as a list of strings. if not isinstance(value, list): input_data = [str(value)] else: input_data = [str(item) for item in value] digits = [] # List to store detected numbers. # Iterate over each string in the input_data. for text in input_data: matches = regex.findall(text) if verbose >= 5: print(f"[DEBUG] Processing '{text}' => Matches: {matches}") for match in matches: try: # match can be a tuple (if using lookahead groups), # so extract the first element if needed. num_str = match[0] if isinstance(match, tuple) else match num = float(num_str) digits.append(num) except ValueError as exc: if error == "raise": raise ValueError( f"Could not convert '{match}' to float." ) from exc elif error == "warn": if verbose >= 1: print( f"[WARN] Skipping value '{match}': conversion failed." ) # If error is "ignore", continue without appending. continue # If conversion to quantile is requested, # convert numbers using soft mode. if as_q: digits = validate_quantiles( digits, mode="soft", round_digits=2, dtype=np.float64 ) # Remove duplicates if requested. if return_unique: digits = list(set(digits)) # Optionally sort the detected numbers. if sort: digits = sorted(digits) if verbose >= 3: print(f"[INFO] Detected digits: {digits}") return digits def validate_consistency_q( user_q: list[float], q_items: Union[str, list[Any]], error: str = "raise", mode: str = "soft", msg: Optional[str] = None, default_to: str = "valid_q", verbose: int = 0, ): r""" Validate the consistency of user-specified quantile values with those auto-detected from the input. This function compares the quantile values provided in ``user_q`` with the numeric values extracted from ``q_items`` (using :func:`detect_digits` with ``as_q=True``). Let :math:`Q_{user}` be the set of quantile values provided by the user and :math:`Q_{det}` be the set of quantile values detected from ``q_items``. In soft mode, the function returns the intersection, i.e., .. math:: Q_{valid} = Q_{user} \cap Q_{det}, whereas in strict mode, it expects an exact match and returns :math:`Q_{user}` directly. Parameters ---------- user_q : list of float A list of quantile values provided by the user. These represent the expected quantiles for evaluation or forecasting. q_items : Union[str, List[str], pandas.DataFrame] The source from which quantile values are auto-detected. This can be a string, a list of strings, or a DataFrame whose columns contain quantile information. error : str, optional Determines the error handling behavior if the user-specified quantiles do not match the detected values. Options are: - ``"raise"`` : Raise a ValueError. - ``"warn"`` : Emit a warning and continue. - ``"ignore"``: Silently ignore mismatches. Default is ``"raise"``. mode : str, optional The matching mode. In ``"soft"`` mode (default), the function returns the intersection of user and detected quantiles. In ``"strict"`` mode, the user-specified quantiles must exactly match those detected, and the function returns ``user_q``. msg : str, optional A custom error message to use if inconsistencies are found. If not provided, a default message is generated. default_to: str, default='valid_q' Return kind when inconsistent numbers found in quantiles. In ``'soft'`` mode, it controls whether to return the 'valid_q' valids quantiles or ``'auto_q'``for automatic_detected quanties. Defaut is the ``'valid_q'``. verbose : int, optional Verbosity level for debugging output. Higher values (e.g., 5 or above) yield more detailed logs. Default is 0. Returns ------- list A sorted list of validated quantile values (as floats) that are consistent between the user-specified values and those detected from ``q_items``. Examples -------- >>> from kdiagram.utils.diagnose_q import validate_consistency_q >>> user_quantiles = [0.1, 0.5, 0.9] >>> columns = ["subsidence_q10_step1", "subsidence_q50_step1", ... "subsidence_q90_step1", "other_column"] >>> validate_consistency_q(user_quantiles, columns) [0.1, 0.5, 0.9] Notes ----- This function leverages :func:`detect_digits` to extract numeric quantile values from the input and :func:`is_in_if` to compute the intersection between the user-specified and detected quantiles. In soft mode, minor discrepancies are tolerated; strict mode requires an exact match. See Also -------- detect_digits : Extracts numeric values from strings, including decimals. is_in_if : Checks membership and returns the intersection of lists. validate_quantiles : Converts numeric values to quantile values in soft mode. References ---------- .. [1] Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2009). *Introduction to Algorithms* (3rd ed.). MIT Press. .. [2] Aho, A. V., Lam, M. S., Sethi, R., & Ullman, J. D. (2006). *Compilers: Principles, Techniques, and Tools*. Pearson. """ # If q_items is a DataFrame, extract its columns. if isinstance(q_items, pd.DataFrame): q_items = q_items.columns # Detect quantile values from q_items using detect_digits in quantile mode. detected_q_values = detect_digits( q_items, as_q=True, sort=True, return_unique=True ) if verbose >= 5: print(f"[DEBUG] Detected quantile values: {detected_q_values}") # Use is_in_if to get the intersection between user_q and detected_q_values. valid_quantiles = is_in_if( sorted(user_q), detected_q_values, error=error, return_intersect=True ) if verbose >= 5: print( f"[DEBUG] Valid quantiles after intersection: {valid_quantiles}" ) # If valid_quantiles is not empty, sort it; otherwise, handle error. if valid_quantiles: valid_quantiles = sorted(valid_quantiles) else: default_err = ( "User provided quantiles do not match any detected " "quantile values in the DataFrame columns:" f" {user_q} != {detected_q_values}" ) err_msg = msg if msg is not None else default_err suff = ". Returning " + ( "an empty list." if default_to == "valid_q" else ("the detected values instead.") ) if error == "raise": raise ValueError(err_msg) elif error == "warn": warnings.warn(err_msg + f"{suff}", UserWarning, stacklevel=2) return [] if default_to == "valid_q" else detected_q_values # In strict mode, expect the user_q to exactly match the detected quantiles. if mode == "strict": # valid_quantiles = user_q valid_quantiles = _verify_identical_items( user_q, detected_q_values, ops="validate", objname="quantiles list", ) # Check consistency in count between valid and detected quantiles. if len(valid_quantiles) != len(detected_q_values): default_err = ( "Inconsistent number of quantiles: user provided " f"valid {len(valid_quantiles)} ({valid_quantiles}) vs detected " f"{len(detected_q_values)} ({detected_q_values})." ) err_msg = msg if msg is not None else default_err suff = " Returning " + ( "valid_quantiles instead." if default_to == "valid_q" else ("detected values instead.") ) if default_to == "valid_q": if error == "raise": raise ValueError(err_msg) elif error == "warn": warnings.warn(err_msg + f"{suff}", UserWarning, stacklevel=2) else: # 'auto_q' if error == "warn": warnings.warn(err_msg + f"{suff}", UserWarning, stacklevel=2) valid_quantiles = detected_q_values # Optionally sort the result if not already # sorted (redundant here, but for safety). return sorted(valid_quantiles) def _verify_identical_items( list1, list2, mode: str = "unique", ops: str = "check_only", error: str = "raise", objname: str = None, ) -> Union[bool, list]: r""" Check if two lists contain identical elements according to the specified mode. In "unique" mode, the function compares the unique elements in each list. In "ascending" mode, it compares elements pairwise in order. Parameters ---------- list1 : list The first list of items. list2` : list The second list of items. mode : {'unique', 'ascending'}, default="unique" The mode of comparison: - "unique": Compare unique elements (order-insensitive). - "ascending": Compare each element pairwise in order. ops : {'check_only', 'validate'}, default="check_only" If "check_only", returns True/False indicating a match. If "validate", returns the validated list. error : {'raise', 'warn', 'ignore'}, default="raise" Specifies how to handle mismatches. objname : str, optional A name to include in error messages. Returns ------- bool or list Depending on `ops`, returns True/False or the validated list. Examples -------- >>> from gofast.core.generic import verify_identical_items >>> list1 = [0.1, 0.5, 0.9] >>> list2 = [0.1, 0.5, 0.9] >>> verify_identical_items(list1, list2, mode="unique", ops="validate") [0.1, 0.5, 0.9] >>> verify_identical_items(list1, list2, mode="ascending", ops="check_only") True Notes ----- In "ascending" mode, both lists must have the same length, and the function compares each corresponding pair of elements. In "unique" mode, the function uses the set of unique values for comparison. If the lists contain mixed types, the function attempts to compare their string representations. """ # Validate mode. if mode not in ("unique", "ascending"): raise ValueError("mode must be either 'unique' or 'ascending'") if ops not in ("check_only", "validate"): raise ValueError("ops must be either 'check_only' or 'validate'") if error not in ("raise", "warn", "ignore"): raise ValueError("error must be one of 'raise', 'warn', or 'ignore'") # Ascending mode: compare each element in order. if mode == "ascending": if len(list1) != len(list2): msg = ( f"Length mismatch in {objname or 'object lists'}: " f"{len(list1)} vs {len(list2)}." ) if error == "raise": raise ValueError(msg) elif error == "warn": import warnings warnings.warn(msg, UserWarning, stacklevel=2) return False differences = [] for idx, (a, b) in enumerate(zip(list1, list2)): if a != b: differences.append((idx, a, b)) if differences: msg = ( f"Differences in {objname or 'object lists'}: {differences}." ) if error == "raise": raise ValueError(msg) elif error == "warn": import warnings warnings.warn(msg, UserWarning, stacklevel=2) return False return True if ops == "check_only" else list1 # Unique mode: compare the unique elements of each list. else: try: unique1 = sorted(set(list1)) unique2 = sorted(set(list2)) except Exception: unique1 = sorted({str(x) for x in list1}) unique2 = sorted({str(x) for x in list2}) if unique1 != unique2: msg = ( f"Inconsistent unique elements in {objname or 'object lists'}: " f"{unique1} vs {unique2}." ) if error == "raise": raise ValueError(msg) elif error == "warn": import warnings warnings.warn(msg, UserWarning, stacklevel=2) return False return True if ops == "check_only" else unique1 def validate_qcols( q_cols: Union[str, int, Sequence[Any]], ncols_exp: Optional[str] = None, err_msg: Optional[str] = None, ) -> list[str]: r""" Validate and standardise a collection of column names that represent quantiles or prediction outputs. The function `validate_qcols` converts the input to a clean list of strings, removes blanks, and—optionally—checks that the final list length satisfies an expectation expressed in `<ncols_exp>`. .. math:: \text{valid} = \bigl\\{\\,c \\mid c \neq ''\bigr\\} If an expectation is supplied, the function compares :math:`|\\,\text{valid}\\,|` to the requested condition and raises an error if the test fails. Parameters ---------- q_cols : list, str, tuple or set Column names to validate. May be a single string, an iterable of names, or any mixture thereof. Non‑string entries are cast to string. ncols_exp : str or None, optional Expectation on the number of columns. The string must begin with a comparison operator (``'==', '>=', '<=', '!=', '>'`` or ``'<'``) followed by an integer, e.g. ``'>=2'`` or ``'==3'``. If *None*, no length check is applied. err_msg : str or None, optional Custom message to raise if the expectation in `<ncols_exp>` is not met. If *None*, a default message is generated. Returns ------- list A cleaned list of column names that meet all checks. Raises ------ TypeError If `<q_cols>` is not a recognised container or string. ValueError If `<q_cols>` is empty after cleaning, or if the length check in `<ncols_exp>` fails. Examples -------- >>> from kdiagram.utils.diagnose_q import validate_qcols >>> validate_qcols('q50') ['q50'] >>> validate_qcols(['q10', 'q90'], ncols_exp='==2') ['q10', 'q90'] >>> validate_qcols(('p1', 'p2', ''), ncols_exp='>=2') ['p1', 'p2'] Notes ----- The expectation string is parsed by splitting on the first occurring comparison operator and casting the remainder to int. This avoids ambiguous patterns and guarantees that ``ops[op](len(cols), expected)`` is evaluated safely. See Also -------- operator : Built‑in module providing comparison functions. References ---------- .. [1] Harris, C. R. *et al.* (2020). Array programming with NumPy. *Nature*, 585, 357‑362. """ # Step‑1 : convert <q_cols> to a list of strings if q_cols is None: raise ValueError( "`q_cols` cannot be None. Provide at least one column name." ) if isinstance(q_cols, (str, int)): q_cols = [str(q_cols)] elif isinstance(q_cols, (tuple, set, list)): q_cols = [str(col) for col in q_cols] else: raise TypeError( "`q_cols` must be a list, tuple, set or single string." ) # Remove blanks and strip white‑space q_cols = [col.strip() for col in q_cols if col.strip()] if len(q_cols) == 0: raise ValueError("`q_cols` is empty after cleaning.") # Step‑2 : optional length expectation check if ncols_exp: _ops: dict[str, Any] = { "==": operator.eq, "=": operator.eq, "!=": operator.ne, ">=": operator.ge, "<=": operator.le, ">": operator.gt, "<": operator.lt, } # longest operators first (>=, <=, !=, ==) for sym in sorted(_ops, key=len, reverse=True): if ncols_exp.startswith(sym): num_str = ncols_exp[len(sym) :].strip() if not num_str.isdigit(): raise ValueError( f"Invalid expectation syntax '{ncols_exp}'." ) expected = int(num_str) if not _ops[sym](len(q_cols), expected): raise ValueError( err_msg or f"Expected {ncols_exp}, got " f"{len(q_cols)}: {q_cols}" ) break else: raise ValueError(f"Invalid `ncols_exp` format: {ncols_exp}") return q_cols def build_qcols_multiple( q_cols: Optional[Sequence[tuple[str, ...]]] = None, qlow_cols: Optional[Sequence[str]] = None, qup_cols: Optional[Sequence[str]] = None, qmed_cols: Optional[Sequence[str]] = None, *, enforce_triplet: bool = False, allow_pair_when_median: bool = False, ) -> list[tuple[str, ...]]: r""" Assemble and validate tuples of quantile columns. Parameters ---------- q_cols : sequence of tuple, optional Pre‑built tuples of column names. Each tuple can be ``(q10, q90)`` or ``(q10, q50, q90)``. If this argument is supplied, the helper bypasses the individual `qlow_cols`, `qup_cols`, and `qmed_cols` inputs. qlow_cols : sequence of str, optional Column names representing the lower quantile (e.g. 10 th percentile). qup_cols : sequence of str, optional Column names representing the upper quantile (e.g. 90 th percentile). qmed_cols : sequence of str, optional Column names representing the median (e.g. 50 th percentile). If provided, each tuple will be returned as ``(q10, q50, q90)``. enforce_triplet : bool, default=False * If ``True`` the output **must** be a triplet ``(low, med, up)``. Raises an error when `qmed_cols` is missing or when `q_cols` contains pairs. * If ``False`` the function returns pairs when no median columns are supplied. allow_pair_when_median : bool, default=False By default, when `qmed_cols` is supplied the helper always returns triplets. Set this flag to ``True`` to ignore `qmed_cols` and still output pairs ``(low, up)`` (useful for quick A/B tests). Returns ------- list of tuple A list of tuples with either two or three column names depending on the inputs and flags. Raises ------ ValueError On mismatched lengths, invalid tuple sizes, or missing mandatory inputs. Examples -------- >>> from kdiagram.utils.diagnose_q import build_qcols_multiple >>> # 1) Use pre‑built list of pairs >>> q_pairs = [('q10', 'q90'), ('lwr', 'upr')] >>> build_qcols_multiple(q_cols=q_pairs) [('q10', 'q90'), ('lwr', 'upr')] >>> # 2) Separate lower / upper lists (no median) >>> lows = ['q10', 'lwr'] >>> ups = ['q90', 'upr'] >>> build_qcols_multiple(qlow_cols=lows, qup_cols=ups) [('q10', 'q90'), ('lwr', 'upr')] >>> # 3) Triplets with median enforced >>> meds = ['q50', 'mid'] >>> build_qcols_multiple( ... qlow_cols=lows, ... qup_cols=ups, ... qmed_cols=meds, ... enforce_triplet=True ... ) [('q10', 'q50', 'q90'), ('lwr', 'mid', 'upr')] >>> # 4) Ignore supplied median and still get pairs >>> build_qcols_multiple( ... qlow_cols=lows, ... qup_cols=ups, ... qmed_cols=meds, ... allow_pair_when_median=True ... ) [('q10', 'q90'), ('lwr', 'upr')] """ # -------------------------------------------------- # Case‑1: user already passed `q_cols` # -------------------------------------------------- if q_cols is not None: if not all(isinstance(t, (list, tuple)) for t in q_cols): raise ValueError("`q_cols` must be an iterable of tuples.") sizes = {len(t) for t in q_cols} if sizes - {2, 3}: raise ValueError("`q_cols` tuples must have length 2 or 3.") if enforce_triplet and sizes != {3}: raise ValueError( "`enforce_triplet=True` requires every " "tuple in `q_cols` to have three items." ) if ( (not enforce_triplet) and (sizes == {3}) and allow_pair_when_median ): # convert triplets to pairs (low, up) q_cols = [(t[0], t[-1]) for t in q_cols] return [tuple(t) for t in q_cols] # -------------------------------------------------- # Case‑2: build tuples from separate lists # -------------------------------------------------- if qlow_cols is None or qup_cols is None: raise ValueError( "When `q_cols` is not provided, both " "`qlow_cols` and `qup_cols` must be given." ) if len(qlow_cols) != len(qup_cols): raise ValueError( "`qlow_cols` and `qup_cols` must be the same length." ) # -- median logic if qmed_cols is not None and not allow_pair_when_median: if len(qmed_cols) != len(qlow_cols): raise ValueError( "`qmed_cols` must be the same length as " "`qlow_cols` and `qup_cols`." ) tuples = list(zip(qlow_cols, qmed_cols, qup_cols)) else: if enforce_triplet: raise ValueError( "`enforce_triplet=True` but no median columns were supplied." ) tuples = list(zip(qlow_cols, qup_cols)) return [tuple(t) for t in tuples]