Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Nodes API

Complete API documentation for all node classes and implementations.

Overview

Nodes are the building blocks of CUVIS.AI pipelines. This page documents all available node implementations organized by functional category.


Anomaly Detection Nodes

Statistical and deep learning methods for detecting anomalies in hyperspectral data.

RX Detector

rx_detector

RX anomaly detection nodes for hyperspectral imaging.

This module implements the Reed-Xiaoli (RX) anomaly detection algorithm, a widely used statistical method for detecting anomalies in hyperspectral images. The RX algorithm computes squared Mahalanobis distance from the background distribution, treating pixels with large distances as potential anomalies.

The module provides two variants:

  • RXGlobal: Uses global statistics (mean, covariance) estimated from training data. Supports two-phase training: statistical initialization followed by optional gradient-based fine-tuning via unfreeze().

  • RXPerBatch: Computes statistics independently for each batch on-the-fly without requiring initialization. Useful for real-time processing or when training data is unavailable.

Reference: Reed, I. S., & Yu, X. (1990). "Adaptive multiple-band CFAR detection of an optical pattern with unknown spectral distribution." IEEE Transactions on Acoustics, Speech, and Signal Processing, 38(10), 1760-1770.

RXBase
RXBase(eps=1e-06, **kwargs)

Bases: Node

Base class for RX anomaly detectors.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(self, eps: float = 1e-6, **kwargs) -> None:
    self.eps = eps
    super().__init__(eps=eps, **kwargs)
RXGlobal
RXGlobal(
    num_channels, eps=1e-06, cache_inverse=True, **kwargs
)

Bases: RXBase

RX anomaly detector with global background statistics.

Uses global mean (μ) and covariance (Σ) estimated from training data to compute Mahalanobis distance scores. Supports two-phase training: statistical initialization followed by optional gradient-based fine-tuning.

The detector computes anomaly scores as:

RX(x) = (x - μ)ᵀ Σ⁻¹ (x - μ)

where x is a pixel spectrum, μ is the background mean, and Σ is the covariance matrix.

Parameters:

Name Type Description Default
num_channels int

Number of spectral channels in input data

required
eps float

Small constant added to covariance diagonal for numerical stability (default: 1e-6)

1e-06
cache_inverse bool

If True, precompute and cache Σ⁻¹ for faster inference (default: True)

True
**kwargs dict

Additional arguments passed to Node base class

{}

Attributes:

Name Type Description
mu Tensor or Parameter

Background mean spectrum, shape (C,). Initially a buffer, becomes Parameter after unfreeze()

cov Tensor or Parameter

Background covariance matrix, shape (C, C)

cov_inv Tensor or Parameter

Cached pseudo-inverse of covariance (if cache_inverse=True)

_statistically_initialized bool

Flag indicating whether statistical_initialization() has been called

Examples:

>>> from cuvis_ai.anomaly.rx_detector import RXGlobal
>>> from cuvis_ai_core.training import StatisticalTrainer
>>>
>>> # Create RX detector
>>> rx = RXGlobal(num_channels=61, eps=1.0e-6)
>>>
>>> # Phase 1: Statistical initialization
>>> stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
>>> stat_trainer.fit()  # Computes μ and Σ from training data
>>>
>>> # Inference with frozen statistics
>>> output = rx.forward(data=hyperspectral_cube)
>>> scores = output["scores"]  # [B, H, W, 1]
>>>
>>> # Phase 2: Optional gradient-based fine-tuning
>>> rx.unfreeze()  # Convert buffers to nn.Parameters
>>> # Now μ and Σ can be updated with gradient descent
See Also

RXPerBatch : Per-batch RX variant without training MinMaxNormalizer : Recommended preprocessing before RX ScoreToLogit : Convert scores to logits for classification docs/tutorials/rx-statistical.md : Complete RX pipeline tutorial

Notes

After statistical_initialization(), mu and cov are stored as buffers (frozen by default). Call unfreeze() to convert them to trainable nn.Parameters for gradient-based optimization.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(
    self, num_channels: int, eps: float = 1e-6, cache_inverse: bool = True, **kwargs
) -> None:
    self.num_channels = int(num_channels)
    self.eps = eps
    self.cache_inverse = cache_inverse
    # Call Node.__init__ directly with all parameters for proper serialization
    # We bypass RXBase.__init__ since it only accepts eps
    # Node.__init__(self, num_channels=self.num_channels, eps=self.eps, cache_inverse=self.cache_inverse)

    super().__init__(
        num_channels=self.num_channels, eps=self.eps, cache_inverse=self.cache_inverse, **kwargs
    )

    # global stats - all stored as buffers initially
    self.register_buffer("mu", torch.zeros(self.num_channels, dtype=torch.float32))  # (C,)
    self.register_buffer(
        "cov", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float32)
    )  # (C,C)
    self.register_buffer(
        "cov_inv", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float32)
    )  # (C,C)
    # Streaming accumulators (float64 for numerical stability)
    self.register_buffer("_mean", torch.zeros(self.num_channels, dtype=torch.float64))
    self.register_buffer(
        "_M2", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float64)
    )
    self._n = 0
    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Initialize mu and Sigma from data iterator.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is BHWC

required
Source code in cuvis_ai/anomaly/rx_detector.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize mu and Sigma from data iterator.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is BHWC
    """
    self.reset()
    for batch_data in input_stream:
        # Extract data from port-based dict
        x = batch_data["data"]
        if x is not None:
            self.update(x)

    if self._n > 0:
        self.finalize()
    self._statistically_initialized = True
unfreeze
unfreeze()

Convert mu and cov buffers to trainable nn.Parameters.

Call this method after fit() to enable gradient-based optimization of the mean and covariance statistics. They will be converted from buffers to nn.Parameters, allowing gradient updates during training.

Example

rx.fit(input_stream) # Statistical initialization rx.unfreeze() # Enable gradient training

Now RX statistics can be fine-tuned with gradient descent
Source code in cuvis_ai/anomaly/rx_detector.py
def unfreeze(self) -> None:
    """Convert mu and cov buffers to trainable nn.Parameters.

    Call this method after fit() to enable gradient-based optimization of
    the mean and covariance statistics. They will be converted from buffers
    to nn.Parameters, allowing gradient updates during training.

    Example
    -------
    >>> rx.fit(input_stream)  # Statistical initialization
    >>> rx.unfreeze()  # Enable gradient training
    >>> # Now RX statistics can be fine-tuned with gradient descent
    """
    if self.mu.numel() > 0 and self.cov.numel() > 0:
        # Convert buffers to parameters
        self.mu = nn.Parameter(self.mu.clone(), requires_grad=True)
        self.cov = nn.Parameter(self.cov.clone(), requires_grad=True)
        if self.cov_inv.numel() > 0:
            self.cov_inv = nn.Parameter(self.cov_inv.clone(), requires_grad=True)
    # Call parent to enable requires_grad
    super().unfreeze()
update
update(batch_bhwc)

Update streaming statistics with a new batch using Welford's algorithm.

This method incrementally computes mean and covariance from batches of data using Welford's numerically stable online algorithm. Multiple batches can be processed sequentially before calling finalize() to compute final statistics.

Parameters:

Name Type Description Default
batch_bhwc Tensor

Input batch in BHWC format, shape (B, H, W, C)

required
Notes

The algorithm maintains running accumulators (_mean, _M2, _n) in float64 for numerical stability. Batches with ≤1 samples are ignored. After update(), call finalize() to compute mu and cov from the accumulated statistics.

Source code in cuvis_ai/anomaly/rx_detector.py
@torch.no_grad()
def update(self, batch_bhwc: torch.Tensor) -> None:
    """Update streaming statistics with a new batch using Welford's algorithm.

    This method incrementally computes mean and covariance from batches of data
    using Welford's numerically stable online algorithm. Multiple batches can be
    processed sequentially before calling finalize() to compute final statistics.

    Parameters
    ----------
    batch_bhwc : torch.Tensor
        Input batch in BHWC format, shape (B, H, W, C)

    Notes
    -----
    The algorithm maintains running accumulators (_mean, _M2, _n) in float64
    for numerical stability. Batches with ≤1 samples are ignored. After update(),
    call finalize() to compute mu and cov from the accumulated statistics.
    """
    X = _flatten_bhwc(batch_bhwc).reshape(-1, batch_bhwc.shape[-1])  # (M,C)
    m = X.shape[0]
    if m <= 1:
        return
    mean_b = X.mean(0)
    M2_b = (X - mean_b).T @ (X - mean_b)
    if self._n == 0:
        self._n, self._mean, self._M2 = m, mean_b, M2_b
    else:
        n, tot = self._n, self._n + m
        delta = mean_b - self._mean
        new_mean = self._mean + delta * (m / tot)
        outer = torch.outer(delta, delta) * (n * m / tot)
        self._n, self._mean, self._M2 = tot, new_mean, self._M2 + M2_b + outer
    self._statistically_initialized = False
finalize
finalize()

Compute final mean and covariance from accumulated streaming statistics.

This method converts the running accumulators (_mean, _M2) into the final mean (mu) and covariance (cov) matrices. The covariance is regularized with eps * I for numerical stability, and optionally caches the pseudo-inverse.

Returns:

Type Description
RXGlobal

Returns self for method chaining

Raises:

Type Description
ValueError

If fewer than 2 samples were accumulated (insufficient for covariance estimation)

Notes

After finalization, mu and cov are stored as buffers (frozen by default). Call unfreeze() to convert them to nn.Parameters for gradient-based training.

Source code in cuvis_ai/anomaly/rx_detector.py
@torch.no_grad()
def finalize(self) -> "RXGlobal":
    """Compute final mean and covariance from accumulated streaming statistics.

    This method converts the running accumulators (_mean, _M2) into the final
    mean (mu) and covariance (cov) matrices. The covariance is regularized with
    eps * I for numerical stability, and optionally caches the pseudo-inverse.

    Returns
    -------
    RXGlobal
        Returns self for method chaining

    Raises
    ------
    ValueError
        If fewer than 2 samples were accumulated (insufficient for covariance estimation)

    Notes
    -----
    After finalization, mu and cov are stored as buffers (frozen by default).
    Call unfreeze() to convert them to nn.Parameters for gradient-based training.
    """
    if self._n <= 1:
        raise ValueError("Not enough samples to finalize.")
    mu = self._mean.clone()
    cov = self._M2 / (self._n - 1)
    if self.eps > 0:
        cov = cov + self.eps * torch.eye(cov.shape[0], device=cov.device, dtype=cov.dtype)
    # Always store as buffers initially (frozen by default)
    self.mu = mu
    self.cov = cov
    if self.cache_inverse:
        self.cov_inv = torch.linalg.pinv(cov)
    else:
        self.cov_inv = torch.empty(0, 0)
    self._statistically_initialized = True
    return self
reset
reset()

Reset all statistics and accumulators to empty state.

Clears mu, cov, cov_inv, and all streaming accumulators (_mean, _M2, _n). After reset, the detector must be re-initialized via statistical_initialization() before it can be used for inference.

Notes

Use this method when you need to re-initialize the detector with different training data or when switching between different dataset distributions.

Source code in cuvis_ai/anomaly/rx_detector.py
def reset(self) -> None:
    """Reset all statistics and accumulators to empty state.

    Clears mu, cov, cov_inv, and all streaming accumulators (_mean, _M2, _n).
    After reset, the detector must be re-initialized via statistical_initialization()
    before it can be used for inference.

    Notes
    -----
    Use this method when you need to re-initialize the detector with different
    training data or when switching between different dataset distributions.
    """
    self.mu = torch.empty(0)
    self.cov = torch.empty(0, 0)
    self.cov_inv = torch.empty(0, 0)
    self._n = 0
    self._mean = torch.empty(0)
    self._M2 = torch.empty(0, 0)
    self._statistically_initialized = False
forward
forward(data, **_)

Forward pass computing anomaly scores.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "scores" key containing BHW1 anomaly scores

Source code in cuvis_ai/anomaly/rx_detector.py
def forward(self, data: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Forward pass computing anomaly scores.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "scores" key containing BHW1 anomaly scores
    """
    if not self._statistically_initialized or self.mu.numel() == 0:
        raise RuntimeError(
            "RXGlobal not initialized. Call statistical_initialization() before inference."
        )
    B, H, W, C = data.shape
    N = H * W
    X = data.view(B, N, C)
    # Convert dtype if needed, but don't change device (assumes everything on same device)
    Xc = X - self.mu.to(X.dtype)
    if self.cov_inv.numel() > 0:
        cov_inv = self.cov_inv.to(X.dtype)
        md2 = torch.einsum("bnc,cd,bnd->bn", Xc, cov_inv, Xc)  # (B,N)
    else:
        md2 = self._quad_form_solve(Xc, self.cov.to(X.dtype))
    scores = md2.view(B, H, W).unsqueeze(-1)  # Add channel dimension (B,H,W,1)
    return {"scores": scores}
RXPerBatch
RXPerBatch(eps=1e-06, **kwargs)

Bases: RXBase

Computes μ, Σ per image in the batch on the fly; no fit/finalize.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(self, eps: float = 1e-6, **kwargs) -> None:
    self.eps = eps
    super().__init__(eps=eps, **kwargs)
forward
forward(data, **_)

Forward pass computing per-batch anomaly scores.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "scores" key containing BHW1 anomaly scores

Source code in cuvis_ai/anomaly/rx_detector.py
def forward(self, data: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Forward pass computing per-batch anomaly scores.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "scores" key containing BHW1 anomaly scores
    """
    B, H, W, C = data.shape
    N = H * W
    X_flat = _flatten_bhwc(data)  # (B,N,C)
    mu = X_flat.mean(1, keepdim=True)  # (B,1,C)
    Xc = X_flat - mu
    cov = torch.matmul(Xc.transpose(1, 2), Xc) / max(N - 1, 1)  # (B,C,C)
    eye = torch.eye(C, device=data.device, dtype=data.dtype).expand(B, C, C)
    cov = cov + self.eps * eye
    md2 = self._quad_form_solve(Xc, cov)  # (B,N)
    scores = md2.view(B, H, W)
    return {"scores": scores.unsqueeze(-1)}

LAD Detector

lad_detector

Laplacian Anomaly Detector (LAD) for hyperspectral anomaly detection.

This module implements the Laplacian Anomaly Detector, a graph-based approach for detecting spectral anomalies in hyperspectral images. LAD constructs a spectral graph using Cauchy similarity weights and computes anomaly scores based on the graph Laplacian.

The LAD algorithm identifies anomalies by measuring how unusual a pixel's spectral signature is within the spectral manifold learned from background data. Unlike RX detectors that assume Gaussian distributions, LAD captures nonlinear manifold structures through graph construction.

Reference: Gu, Y., Liu, Y., & Zhang, Y. (2008). "A selective KPCA algorithm based on high-order statistics for anomaly detection in hyperspectral imagery." IEEE Geoscience and Remote Sensing Letters, 5(1), 43-47.

LADGlobal
LADGlobal(
    num_channels,
    eps=1e-08,
    normalize_laplacian=True,
    use_numpy_laplacian=True,
    **kwargs,
)

Bases: Node

Laplacian Anomaly Detector (global), variant 'C' (Cauchy), port-based.

This is the new cuvis.ai v3 implementation of the LAD detector. It follows the same mathematical definition as the legacy v2 LADGlobal, but exposes a port-based interface compatible with CuvisPipeline, StatisticalTrainer, and GradientTrainer.

Ports

INPUT_SPECS data : float32, shape (-1, -1, -1, -1) Input hyperspectral cube in BHWC format. OUTPUT_SPECS scores : float32, shape (-1, -1, -1, 1) Per pixel anomaly scores in BHW1 format.

Parameters:

Name Type Description Default
eps float

Small epsilon value for numerical stability in Laplacian construction.

1e-8
normalize_laplacian bool

If True, applies symmetric normalization: L = D^{-½} (D - A) D^{-½}. If False, uses unnormalized Laplacian: L = D - A.

True
use_numpy_laplacian bool

If True, constructs the Laplacian matrix using NumPy (float64, 1e-12 eps) for parity with reference implementations. If False, uses pure PyTorch.

True
Training

After statistical initialization via statistical_initialization(), the node can be made trainable by calling unfreeze(). This converts the mean M and Laplacian L buffers to trainable nn.Parameter objects, enabling gradient-based fine-tuning.

Example

lad = LADGlobal(num_channels=61) stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule) stat_trainer.fit() # Statistical initialization lad.unfreeze() # Enable gradient training grad_trainer = GradientTrainer(pipeline=pipeline, datamodule=datamodule, ...) grad_trainer.fit() # Gradient-based fine-tuning

Source code in cuvis_ai/anomaly/lad_detector.py
def __init__(
    self,
    num_channels: int,
    eps: float = 1e-8,
    normalize_laplacian: bool = True,
    use_numpy_laplacian: bool = True,
    **kwargs: Any,
) -> None:
    self.num_channels = int(num_channels)
    self.eps = float(eps)
    self.normalize_laplacian = bool(normalize_laplacian)
    self.use_numpy_laplacian = bool(use_numpy_laplacian)

    super().__init__(
        num_channels=self.num_channels,
        eps=self.eps,
        normalize_laplacian=self.normalize_laplacian,
        use_numpy_laplacian=self.use_numpy_laplacian,
        **kwargs,
    )

    # Streaming accumulators (float64 for numerical stability)
    self.register_buffer(
        "_mean_run", torch.zeros(self.num_channels, dtype=torch.float64)
    )  # (C,)
    self._count: int = 0
    # Model buffers
    self.register_buffer("M", torch.zeros(self.num_channels, dtype=torch.float64))  # (C,)
    self.register_buffer(
        "L", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float64)
    )  # (C, C)
    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Compute global mean M and Laplacian L from a port-based input stream.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS. Expected format: {"data": tensor} where tensor is BHWC.

required
Source code in cuvis_ai/anomaly/lad_detector.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Compute global mean M and Laplacian L from a port-based input stream.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS.
        Expected format: ``{"data": tensor}`` where tensor is BHWC.
    """
    self.reset()

    for batch_data in input_stream:
        x = batch_data.get("data")
        if x is not None:
            self.update(x)

    if self._count <= 0:
        raise RuntimeError("No samples provided to LADGlobal.statistical_initialization()")

    self.finalize()
    self._statistically_initialized = True
update
update(batch_bhwc)

Update running mean statistics from a BHWC batch.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def update(self, batch_bhwc: torch.Tensor) -> None:
    """Update running mean statistics from a BHWC batch."""
    B, H, W, C = batch_bhwc.shape
    X = batch_bhwc.reshape(B * H * W, C).to(dtype=torch.float64)
    m = X.shape[0]
    if m <= 0:
        return

    mean_b = X.mean(dim=0)

    if self._count == 0:
        self._mean_run = mean_b
        self._count = m
    else:
        tot = self._count + m
        delta = mean_b - self._mean_run
        self._mean_run = self._mean_run + delta * (m / tot)
        self._count = tot

    self._statistically_initialized = False
finalize
finalize()

Finalize mean and Laplacian from accumulated statistics.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def finalize(self) -> None:
    """Finalize mean and Laplacian from accumulated statistics."""
    if self._count <= 0 or self._mean_run.numel() == 0:
        raise RuntimeError("No samples accumulated for LADGlobal.finalize()")

    M = self._mean_run.clone().to(dtype=torch.float64)
    C = M.shape[0]
    a = M.mean()

    if self.use_numpy_laplacian:
        # NumPy implementation for exact parity with legacy version
        M_np = M.detach().cpu().numpy()
        A_abs = np.abs(M_np[:, None] - M_np[None, :])
        a_np = float(M_np.mean())
        A_np = 1.0 / (1.0 + (A_abs / (a_np + 1e-12)) ** 2)
        np.fill_diagonal(A_np, 0.0)
        D_np = np.diag(A_np.sum(axis=1))
        L_np = D_np - A_np

        if self.normalize_laplacian:
            d_np = np.diag(D_np)
            d_inv_sqrt_np = np.where(d_np > 0, 1.0 / (np.sqrt(d_np) + 1e-12), 0.0)
            D_inv_sqrt_np = np.diag(d_inv_sqrt_np)
            L_np = D_inv_sqrt_np @ L_np @ D_inv_sqrt_np

        L = torch.from_numpy(L_np).to(dtype=torch.float64, device=M.device)
    else:
        Mi = M.view(C, 1)
        Mj = M.view(1, C)
        denom = a + torch.tensor(1e-12, dtype=torch.float64, device=M.device)
        diff = torch.abs(Mi - Mj) / denom
        A = 1.0 / (1.0 + diff.pow(2))
        A.fill_diagonal_(0.0)

        D = torch.diag(A.sum(dim=1))
        L = D - A

        if self.normalize_laplacian:
            d = torch.diag(D)
            d_inv_sqrt = torch.where(
                d > 0,
                1.0 / torch.sqrt(d + torch.tensor(1e-12, dtype=torch.float64, device=M.device)),
                torch.zeros_like(d),
            )
            D_inv_sqrt = torch.diag(d_inv_sqrt)
            L = D_inv_sqrt @ L @ D_inv_sqrt

    self.M = M
    self.L = L
    self._statistically_initialized = True
reset
reset()

Reset all statistics and model parameters to initial state.

Clears the streaming mean accumulator (_mean_run), sample count (_count), global mean (M), and Laplacian matrix (L). After reset, the detector must be re-initialized via statistical_initialization() before inference.

Notes

Use this method to re-initialize the detector with different training data or when switching between different spectral distributions.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def reset(self) -> None:
    """Reset all statistics and model parameters to initial state.

    Clears the streaming mean accumulator (_mean_run), sample count (_count),
    global mean (M), and Laplacian matrix (L). After reset, the detector must
    be re-initialized via statistical_initialization() before inference.

    Notes
    -----
    Use this method to re-initialize the detector with different training data
    or when switching between different spectral distributions.
    """
    self.register_buffer(
        "_mean_run", torch.zeros(self.num_channels, dtype=torch.float64)
    )  # (C,)
    self._count: int = 0
    # Model buffers
    self.register_buffer("M", torch.zeros(self.num_channels, dtype=torch.float64))  # (C,)
    self.register_buffer(
        "L", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float64)
    )  # (C, C)
    self._statistically_initialized = False
unfreeze
unfreeze()

Convert M and L buffers to trainable nn.Parameters.

Source code in cuvis_ai/anomaly/lad_detector.py
def unfreeze(self) -> None:
    """Convert M and L buffers to trainable nn.Parameters."""
    if self.M.numel() > 0 and self.L.numel() > 0:
        device = self.M.device
        # Store current values
        M_data = self.M.clone()
        L_data = self.L.clone()

        # Remove buffer registrations
        delattr(self, "M")
        delattr(self, "L")

        # Register as parameters
        self.M = nn.Parameter(M_data, requires_grad=True)
        self.L = nn.Parameter(L_data, requires_grad=True)
        self.M.to(device=device)
        self.L.to(device=device)

    # Call parent to enable requires_grad
    super().unfreeze()
forward
forward(data, **_)

Compute LAD anomaly scores for a BHWC cube.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format.

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with key "scores" containing BHW1 anomaly scores.

Source code in cuvis_ai/anomaly/lad_detector.py
def forward(self, data: torch.Tensor, **_: Any) -> dict[str, torch.Tensor]:
    """Compute LAD anomaly scores for a BHWC cube.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format.

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with key ``"scores"`` containing BHW1 anomaly scores.
    """
    if self.M.numel() == 0 or self.L.numel() == 0 or not self._statistically_initialized:
        raise RuntimeError(
            "LADGlobal not finalized. Call statistical_initialization() before forward()."
        )

    B, H, W, C = data.shape
    N = H * W

    X = data.view(B, N, C)

    Xc = X - self.M.to(dtype=X.dtype)
    L = self.L.to(dtype=X.dtype)

    scores = torch.einsum("bnc,cd,bnd->bn", Xc, L, Xc).view(B, H, W).unsqueeze(-1)
    return {"scores": scores}

Deep SVDD

deep_svdd

Deep SVDD encoder for the port-based cuvis.ai stack.

SpectralNet
SpectralNet(in_dim, rep_dim=32, hidden=128)

Bases: Module

Simple 2-layer MLP used by DeepSVDD to produce latent embeddings.

Source code in cuvis_ai/anomaly/deep_svdd.py
def __init__(self, in_dim: int, rep_dim: int = 32, hidden: int = 128) -> None:
    super().__init__()
    self.fc1 = nn.Linear(in_dim, hidden, bias=True)
    self.fc2 = nn.Linear(hidden, rep_dim, bias=False)

    nn.init.kaiming_uniform_(self.fc1.weight, a=math.sqrt(5))
    if self.fc1.bias is not None:
        fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.fc1.weight)
        bound = 1 / math.sqrt(fan_in)
        nn.init.uniform_(self.fc1.bias, -bound, bound)
    nn.init.xavier_uniform_(self.fc2.weight)
forward
forward(x)

Forward pass through two-layer spectral network.

Parameters:

Name Type Description Default
x Tensor

Input features [B, C].

required

Returns:

Type Description
Tensor

Projected features [B, rep_dim].

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(self, x: torch.Tensor) -> torch.Tensor:
    """Forward pass through two-layer spectral network.

    Parameters
    ----------
    x : torch.Tensor
        Input features [B, C].

    Returns
    -------
    torch.Tensor
        Projected features [B, rep_dim].
    """
    x = F.relu(self.fc1(x))
    return self.fc2(x)
RFFLayer
RFFLayer(input_dim, n_features=2048, gamma=0.1)

Bases: Module

Random Fourier feature encoder for RBF kernels.

Source code in cuvis_ai/anomaly/deep_svdd.py
def __init__(self, input_dim: int, n_features: int = 2048, gamma: float = 0.1) -> None:
    super().__init__()
    scale = math.sqrt(2.0 * float(gamma))
    W = torch.randn(input_dim, n_features, dtype=torch.get_default_dtype()) * scale
    b = torch.rand(n_features, dtype=torch.get_default_dtype()) * (2.0 * math.pi)
    self.register_buffer("W", W)
    self.register_buffer("b", b)
    self.register_buffer(
        "z_scale",
        torch.tensor(math.sqrt(2.0 / float(n_features)), dtype=torch.get_default_dtype()),
    )
forward
forward(x)

Compute random Fourier features for RBF kernel approximation.

Parameters:

Name Type Description Default
x Tensor

Input features [B, input_dim].

required

Returns:

Type Description
Tensor

Random Fourier features [B, n_features].

Notes

Approximates RBF kernel via random Fourier features using: z(x) = sqrt(2/D) * cos(Wx + b) where W ~ N(0, 2*gamma*I).

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(self, x: torch.Tensor) -> torch.Tensor:
    """Compute random Fourier features for RBF kernel approximation.

    Parameters
    ----------
    x : torch.Tensor
        Input features [B, input_dim].

    Returns
    -------
    torch.Tensor
        Random Fourier features [B, n_features].

    Notes
    -----
    Approximates RBF kernel via random Fourier features using:
    z(x) = sqrt(2/D) * cos(Wx + b) where W ~ N(0, 2*gamma*I).
    """
    W: torch.Tensor = self.W  # type: ignore[assignment]
    b: torch.Tensor = self.b  # type: ignore[assignment]
    z_scale: torch.Tensor = self.z_scale  # type: ignore[assignment]
    proj = x @ W + b
    return z_scale * torch.cos(proj)
DeepSVDDProjection
DeepSVDDProjection(
    *,
    in_channels,
    rep_dim=32,
    hidden=128,
    kernel="linear",
    n_rff=2048,
    gamma=None,
    mlp_forward_batch_size=65536,
    **kwargs,
)

Bases: Node

Projection head that maps per-pixel features to Deep SVDD embeddings.

Source code in cuvis_ai/anomaly/deep_svdd.py
def __init__(
    self,
    *,
    in_channels: int,
    rep_dim: int = 32,
    hidden: int = 128,
    kernel: str = "linear",
    n_rff: int = 2048,
    gamma: float | None = None,
    mlp_forward_batch_size: int = 65_536,
    **kwargs: Any,
) -> None:
    if in_channels <= 0:
        raise ValueError(f"in_channels must be positive, got {in_channels}")
    self.in_channels = int(in_channels)
    self.rep_dim = int(rep_dim)
    self.hidden = int(hidden)
    self.kernel = str(kernel)
    self.n_rff = int(n_rff)
    self.gamma = None if gamma is None else float(gamma)
    self.mlp_forward_batch_size = max(1, int(mlp_forward_batch_size))

    super().__init__(
        in_channels=self.in_channels,
        rep_dim=self.rep_dim,
        hidden=self.hidden,
        kernel=self.kernel,
        n_rff=self.n_rff,
        gamma=self.gamma,
        mlp_forward_batch_size=self.mlp_forward_batch_size,
        **kwargs,
    )

    # Build projection network eagerly with known in_channels
    self._build_network()
forward
forward(data, **_)

Project BHWC features into a latent embedding space.

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(self, data: torch.Tensor, **_: Any) -> dict[str, torch.Tensor]:
    """Project BHWC features into a latent embedding space."""
    B, H, W, C = data.shape
    if C != self.in_channels:
        raise ValueError(f"Expected {self.in_channels} channels, got {C}")

    flat = data.contiguous().reshape(B * H * W, C)

    batch_size = self.mlp_forward_batch_size
    embeddings = []
    for start in range(0, flat.shape[0], batch_size):
        chunk = flat[start : start + batch_size]
        embeddings.append(self.net(chunk))
    z = torch.cat(embeddings, dim=0).reshape(B, H, W, self.rep_dim)

    return {"embeddings": z}
ZScoreNormalizerGlobal
ZScoreNormalizerGlobal(
    *,
    num_channels,
    sample_n=500000,
    seed=0,
    eps=1e-08,
    **kwargs,
)

Bases: Node

Port-based Deep SVDD z-score normalizer for BHWC cubes.

Source code in cuvis_ai/anomaly/deep_svdd.py
def __init__(
    self,
    *,
    num_channels: int,
    sample_n: int = 500_000,
    seed: int = 0,
    eps: float = 1e-8,
    **kwargs: Any,
) -> None:
    if num_channels <= 0:
        raise ValueError(f"num_channels must be positive, got {num_channels}")
    self.num_channels = int(num_channels)
    self.sample_n = int(sample_n)
    self.seed = int(seed)
    self.eps = float(eps)

    super().__init__(
        num_channels=self.num_channels,
        sample_n=self.sample_n,
        seed=self.seed,
        eps=self.eps,
        **kwargs,
    )

    # Pre-allocate buffers with known dimensions
    self.register_buffer(
        "zscore_mean", torch.zeros(num_channels, dtype=torch.get_default_dtype())
    )
    self.register_buffer(
        "zscore_std", torch.ones(num_channels, dtype=torch.get_default_dtype())
    )
requires_initial_fit property
requires_initial_fit

Whether this node requires statistical initialization from training data.

Returns:

Type Description
bool

Always True for Z-score normalization.

statistical_initialization
statistical_initialization(input_stream)

Estimate per-band z-score statistics from the provided stream.

Source code in cuvis_ai/anomaly/deep_svdd.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Estimate per-band z-score statistics from the provided stream."""
    pixels: list[np.ndarray] = []
    for batch in input_stream:
        data = batch.get("data")
        if data is None:
            continue
        data = data.contiguous()

        B, H, W, C = data.shape
        if C != self.num_channels:
            raise ValueError(f"Channel mismatch: expected {self.num_channels}, got {C}")
        # todo: https://cuvis.atlassian.net/browse/ALL-4971, later this will be in tensor
        # to avoid converting to numpy and back with streaming stats
        pixels.append(data.reshape(B * H * W, C).detach().cpu().numpy().astype(np.float32))

    if not pixels:
        raise RuntimeError(
            "DeepSVDDEncoder.statistical_initialization() did not receive any data"
        )

    X_all = np.concatenate(pixels, axis=0)
    n_stats = min(self.sample_n, X_all.shape[0])
    if n_stats < X_all.shape[0]:
        rng = np.random.default_rng(self.seed)
        idx = rng.choice(X_all.shape[0], size=n_stats, replace=False)
        X_stats = X_all[idx]
    else:
        X_stats = X_all

    mean, std = self._zscore_fit(X_stats, eps=self.eps)
    self.zscore_mean.copy_(torch.from_numpy(mean).squeeze(0))
    self.zscore_std.copy_(torch.from_numpy(std).squeeze(0))
    self._statistically_initialized = True
forward
forward(data, **_)

Apply per-channel Z-score normalization.

Parameters:

Name Type Description Default
data Tensor

Input feature tensor [B, H, W, C].

required
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "normalized" key containing Z-score normalized data [B, H, W, C].

Raises:

Type Description
RuntimeError

If statistical_initialization() has not been called.

ValueError

If input channel count doesn't match initialized num_channels.

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(self, data: torch.Tensor, **_: Any) -> dict[str, torch.Tensor]:
    """Apply per-channel Z-score normalization.

    Parameters
    ----------
    data : torch.Tensor
        Input feature tensor [B, H, W, C].
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "normalized" key containing Z-score normalized data [B, H, W, C].

    Raises
    ------
    RuntimeError
        If statistical_initialization() has not been called.
    ValueError
        If input channel count doesn't match initialized num_channels.
    """
    if not self._statistically_initialized:
        raise RuntimeError(
            "DeepSVDDEncoder requires statistical_initialization() before forward()"
        )

    B, H, W, C = data.shape
    if C != self.num_channels:
        raise ValueError(f"Channel mismatch: expected {self.num_channels}, got {C}")

    flat = data.contiguous().reshape(B * H * W, C)
    mean = self.zscore_mean.to(dtype=flat.dtype, copy=False).unsqueeze(0)
    std = self.zscore_std.to(dtype=flat.dtype, copy=False).unsqueeze(0)
    flat = (flat - mean) / std

    normalized = flat.reshape(B, H, W, C)
    return {"normalized": normalized}
DeepSVDDScores

Bases: Node

Convert Deep SVDD embeddings + center vector into anomaly scores.

forward
forward(embeddings, center, **_)

Compute anomaly scores as squared distance from center.

Parameters:

Name Type Description Default
embeddings Tensor

Deep SVDD embeddings [B, H, W, D] from projection network.

required
center Tensor

Center vector [D] from DeepSVDDCenterTracker.

required
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "scores" key containing squared distances [B, H, W, 1].

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(
    self, embeddings: torch.Tensor, center: torch.Tensor, **_: Any
) -> dict[str, torch.Tensor]:
    """Compute anomaly scores as squared distance from center.

    Parameters
    ----------
    embeddings : torch.Tensor
        Deep SVDD embeddings [B, H, W, D] from projection network.
    center : torch.Tensor
        Center vector [D] from DeepSVDDCenterTracker.
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "scores" key containing squared distances [B, H, W, 1].
    """
    scores = ((embeddings - center.view(1, 1, 1, -1)) ** 2).sum(dim=-1, keepdim=True)
    return {"scores": scores}
DeepSVDDCenterTracker
DeepSVDDCenterTracker(
    *, rep_dim, alpha=0.1, update_in_eval=False, **kwargs
)

Bases: Node

Track and expose Deep SVDD center statistics with optional logging.

Source code in cuvis_ai/anomaly/deep_svdd.py
def __init__(
    self, *, rep_dim: int, alpha: float = 0.1, update_in_eval: bool = False, **kwargs
) -> None:
    if rep_dim <= 0:
        raise ValueError(f"rep_dim must be positive, got {rep_dim}")
    if not (0.0 < alpha <= 1.0):
        raise ValueError("alpha must be in (0, 1]")
    self.rep_dim = int(rep_dim)
    self.alpha = float(alpha)
    self.update_in_eval = bool(update_in_eval)

    super().__init__(
        rep_dim=self.rep_dim, alpha=self.alpha, update_in_eval=self.update_in_eval, **kwargs
    )

    # Pre-allocate buffer with known dimensions
    self.register_buffer(
        "_tracked_center", torch.zeros(rep_dim, dtype=torch.get_default_dtype())
    )
requires_initial_fit property
requires_initial_fit

Whether this node requires statistical initialization from training data.

Returns:

Type Description
bool

Always True for center tracking initialization.

statistical_initialization
statistical_initialization(input_stream)

Initialize the Deep SVDD center from training embeddings.

Computes the mean embedding across all training samples to initialize the hypersphere center.

Parameters:

Name Type Description Default
input_stream InputStream

Training data stream with embeddings [B, H, W, D].

required

Raises:

Type Description
RuntimeError

If no embeddings are received from the input stream.

ValueError

If embedding dimensions don't match initialized rep_dim.

Source code in cuvis_ai/anomaly/deep_svdd.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize the Deep SVDD center from training embeddings.

    Computes the mean embedding across all training samples to initialize
    the hypersphere center.

    Parameters
    ----------
    input_stream : InputStream
        Training data stream with embeddings [B, H, W, D].

    Raises
    ------
    RuntimeError
        If no embeddings are received from the input stream.
    ValueError
        If embedding dimensions don't match initialized rep_dim.
    """
    total = None
    count = 0
    for batch in input_stream:
        embeddings = batch.get("embeddings")
        if embeddings is None:
            embeddings = batch.get("data")
        if embeddings is None:
            continue

        # Validate dimensions
        if embeddings.shape[-1] != self.rep_dim:
            raise ValueError(
                f"Embedding dimension mismatch: expected {self.rep_dim}, got {embeddings.shape[-1]}"
            )

        flat = embeddings.reshape(-1, embeddings.shape[-1])
        batch_sum = flat.sum(dim=0)
        total = batch_sum if total is None else total + batch_sum
        count += flat.shape[0]

    if total is None or count == 0:
        raise RuntimeError(
            "DeepSVDDCenterTracker.statistical_initialization() received no embeddings"
        )

    self._tracked_center.copy_((total / count).detach())
    self._statistically_initialized = True
forward
forward(embeddings, context=None, **_)

Track and output the Deep SVDD center with exponential moving average.

Updates the center using EMA during training (and optionally during eval), then outputs the current center and center norm metric.

Parameters:

Name Type Description Default
embeddings Tensor

Deep SVDD embeddings [B, H, W, D].

required
context Context

Execution context determining whether to update center.

None
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

Dictionary with: - "center" : torch.Tensor [D] - Current tracked center - "metrics" : list[Metric] - Center norm metric

Raises:

Type Description
RuntimeError

If statistical_initialization() has not been called.

ValueError

If embedding dimensions don't match initialized rep_dim.

Source code in cuvis_ai/anomaly/deep_svdd.py
def forward(
    self, embeddings: torch.Tensor, context: Context | None = None, **_: Any
) -> dict[str, Any]:
    """Track and output the Deep SVDD center with exponential moving average.

    Updates the center using EMA during training (and optionally during eval),
    then outputs the current center and center norm metric.

    Parameters
    ----------
    embeddings : torch.Tensor
        Deep SVDD embeddings [B, H, W, D].
    context : Context, optional
        Execution context determining whether to update center.
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Any]
        Dictionary with:
        - "center" : torch.Tensor [D] - Current tracked center
        - "metrics" : list[Metric] - Center norm metric

    Raises
    ------
    RuntimeError
        If statistical_initialization() has not been called.
    ValueError
        If embedding dimensions don't match initialized rep_dim.
    """
    if not self._statistically_initialized:
        raise RuntimeError(
            "DeepSVDDCenterTracker requires statistical_initialization() before forward()"
        )

    # Validate dimensions
    if embeddings.shape[-1] != self.rep_dim:
        raise ValueError(
            f"Embedding dimension mismatch: expected {self.rep_dim}, got {embeddings.shape[-1]}"
        )

    batch_mean = embeddings.mean(dim=(0, 1, 2)).detach()
    should_update = context is None or context.stage is ExecutionStage.TRAIN
    if not should_update and self.update_in_eval and context is not None:
        should_update = context.stage in {ExecutionStage.VAL, ExecutionStage.TEST}

    if should_update:
        self._tracked_center.copy_(
            (1.0 - self.alpha) * self._tracked_center + self.alpha * batch_mean
        )

    metrics = []
    center_cpu = self._tracked_center.detach().cpu()
    metrics.append(
        Metric(
            name="deepsvdd_center/norm",
            value=float(center_cpu.norm().item()),
            stage=context.stage if context else ExecutionStage.INFERENCE,
            epoch=context.epoch if context else 0,
            batch_idx=context.batch_idx if context else 0,
        )
    )

    center_value = self._tracked_center.detach().clone()
    return {"center": center_value, "metrics": metrics}

Binary Decision Nodes

Nodes that convert anomaly scores into binary decisions (anomaly/normal).

Binary Decider

binary_decider

Binary decision nodes for thresholding anomaly scores and logits.

This module provides threshold-based decision nodes that convert continuous anomaly scores or logits into binary decisions (anomaly/normal). Two strategies are available:

  • BinaryDecider: Fixed threshold applied globally to sigmoid-transformed logits
  • QuantileBinaryDecider: Adaptive per-batch thresholding using quantile statistics

Decision nodes are typically placed at the end of anomaly detection pipelines to convert detector outputs into actionable binary masks for visualization or evaluation.

BinaryDecider
BinaryDecider(threshold=0.5, **kwargs)

Bases: BinaryDecider

Simple decider node using a static threshold to classify data.

Accepts logits as input, applies sigmoid transformation to convert to probabilities [0, 1], then applies threshold to produce binary decisions.

Parameters:

Name Type Description Default
threshold float

The threshold to use for classification after sigmoid. Values >= threshold are classified as anomalies (True). Default: 0.5

0.5

Examples:

>>> from cuvis_ai.deciders.binary_decider import BinaryDecider
>>> import torch
>>>
>>> # Create decider with default threshold
>>> decider = BinaryDecider(threshold=0.5)
>>>
>>> # Apply to RX anomaly logits
>>> logits = torch.randn(4, 256, 256, 1)  # [B, H, W, C]
>>> output = decider.forward(logits=logits)
>>> decisions = output["decisions"]  # [4, 256, 256, 1] boolean mask
>>>
>>> # Use in pipeline
>>> pipeline.connect(
...     (logit_head.logits, decider.logits),
...     (decider.decisions, visualizer.mask),
... )
See Also

QuantileBinaryDecider : Adaptive per-batch thresholding ScoreToLogit : Convert scores to logits before decisioning

Source code in cuvis_ai/deciders/binary_decider.py
def __init__(self, threshold: float = 0.5, **kwargs) -> None:
    self.threshold = threshold
    # Forward threshold to BaseDecider so Serializable captures it in hparams
    super().__init__(threshold=threshold, **kwargs)
forward
forward(logits, **_)

Apply sigmoid and threshold-based decisioning on channels-last data.

Args: logits: Tensor shaped (B, H, W, C) containing logits.

Returns: Dictionary with "decisions" key containing (B, H, W, 1) decision mask.

Source code in cuvis_ai/deciders/binary_decider.py
def forward(
    self,
    logits: Tensor,
    **_: Any,
) -> dict[str, Tensor]:
    """Apply sigmoid and threshold-based decisioning on channels-last data.

    Args:
        logits: Tensor shaped (B, H, W, C) containing logits.

    Returns:
        Dictionary with "decisions" key containing (B, H, W, 1) decision mask.
    """

    # Apply sigmoid if needed to convert logits to probabilities
    tensor = torch.sigmoid(logits)

    # Apply threshold to get binary decisions
    decisions = tensor >= self.threshold
    return {"decisions": decisions}
QuantileBinaryDecider
QuantileBinaryDecider(
    quantile=0.995, reduce_dims=None, **kwargs
)

Bases: BinaryDecider

Quantile-based thresholding node operating on BHWC logits or scores.

This decider computes a tensor-valued threshold per batch item using the requested quantile over one or more non-batch dimensions, then produces a binary mask where values greater than or equal to that threshold are marked as anomalies. Useful for adaptive thresholding when score distributions vary across batches.

Parameters:

Name Type Description Default
quantile float

Quantile in the closed interval [0, 1] used for the threshold computation (default: 0.995). Higher values (e.g., 0.99, 0.995) are typical for anomaly detection to capture rare events.

0.995
reduce_dims Sequence[int] | None

Axes (relative to the input tensor) over which to compute the quantile. When None (default), all non-batch dimensions (H, W, C) are reduced. For per-channel thresholds, use reduce_dims=[1, 2] (reduce H, W only).

None

Examples:

>>> from cuvis_ai.deciders.binary_decider import QuantileBinaryDecider
>>> import torch
>>>
>>> # Create quantile-based decider (99.5th percentile)
>>> decider = QuantileBinaryDecider(quantile=0.995)
>>>
>>> # Apply to anomaly scores
>>> scores = torch.randn(4, 256, 256, 1)  # [B, H, W, C]
>>> output = decider.forward(logits=scores)
>>> decisions = output["decisions"]  # [4, 256, 256, 1] boolean mask
>>>
>>> # Per-channel thresholding (reduce H, W only)
>>> decider_perchannel = QuantileBinaryDecider(
...     quantile=0.99,
...     reduce_dims=[1, 2],  # Compute threshold per channel
... )
See Also

BinaryDecider : Fixed threshold decisioning

Source code in cuvis_ai/deciders/binary_decider.py
def __init__(
    self,
    quantile: float = 0.995,
    reduce_dims: Sequence[int] | None = None,
    **kwargs,
) -> None:
    self._validate_quantile(quantile)
    self.quantile = float(quantile)
    self.reduce_dims = (
        tuple(int(dim) for dim in reduce_dims) if reduce_dims is not None else None
    )
    # Forward init params so Serializable records them for config serialization
    super().__init__(quantile=self.quantile, reduce_dims=self.reduce_dims, **kwargs)
forward
forward(logits, **_)

Apply quantile-based thresholding to produce binary decisions.

Computes per-batch thresholds using the specified quantile over reduce_dims, then classifies values >= threshold as anomalies.

Parameters:

Name Type Description Default
logits Tensor

Input logits or anomaly scores, shape (B, H, W, C)

required

Returns:

Type Description
dict[str, Tensor]

Dictionary containing: - "decisions" : Tensor Binary decision mask, shape (B, H, W, 1)

Source code in cuvis_ai/deciders/binary_decider.py
def forward(self, logits: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply quantile-based thresholding to produce binary decisions.

    Computes per-batch thresholds using the specified quantile over reduce_dims,
    then classifies values >= threshold as anomalies.

    Parameters
    ----------
    logits : Tensor
        Input logits or anomaly scores, shape (B, H, W, C)

    Returns
    -------
    dict[str, Tensor]
        Dictionary containing:
        - "decisions" : Tensor
            Binary decision mask, shape (B, H, W, 1)
    """
    tensor = logits
    reduce_dims = self._resolve_reduce_dims(tensor.dim())

    if len(reduce_dims) == 1:
        threshold = torch.quantile(
            tensor,
            self.quantile,
            dim=reduce_dims[0],
            keepdim=True,
        )
    else:
        tensor_ndim = tensor.dim()
        dims_to_keep = tuple(i for i in range(tensor_ndim) if i not in reduce_dims)
        new_order = (*dims_to_keep, *reduce_dims)
        permuted = tensor.permute(new_order)
        sizes_keep = [permuted.size(i) for i in range(len(dims_to_keep))]
        flattened = permuted.reshape(*sizes_keep, -1)
        threshold_flat = torch.quantile(
            flattened,
            self.quantile,
            dim=len(dims_to_keep),
            keepdim=True,
        )
        threshold_permuted = threshold_flat.reshape(
            *sizes_keep,
            *([1] * len(reduce_dims)),
        )
        inverse_order = [0] * tensor_ndim
        for original_idx, permuted_idx in enumerate(new_order):
            inverse_order[permuted_idx] = original_idx
        threshold = threshold_permuted.permute(*inverse_order)

    decisions = (tensor >= threshold).to(torch.bool)
    return {"decisions": decisions}

Two-Stage Decider

two_stage_decider

Two-Stage Binary Decision Module.

This module provides a two-stage binary decision node that first applies an image-level anomaly gate based on top-k statistics, then applies pixel-level quantile thresholding only for images that pass the gate.

This approach reduces false positives by filtering out images with low overall anomaly scores before applying pixel-level decisions.

See Also

cuvis_ai.deciders.binary_decider : Simple threshold-based binary decisions

TwoStageBinaryDecider
TwoStageBinaryDecider(
    image_threshold=0.5,
    top_k_fraction=0.001,
    quantile=0.995,
    reduce_dims=None,
    **kwargs,
)

Bases: BinaryDecider

Two-stage binary decider: image-level gate + pixel quantile mask.

Source code in cuvis_ai/deciders/two_stage_decider.py
def __init__(
    self,
    image_threshold: float = 0.5,
    top_k_fraction: float = 0.001,
    quantile: float = 0.995,
    reduce_dims: Sequence[int] | None = None,
    **kwargs,
) -> None:
    if not 0.0 <= image_threshold <= 1.0:
        raise ValueError("image_threshold must be within [0, 1]")
    if not 0.0 < top_k_fraction <= 1.0:
        raise ValueError("top_k_fraction must be in (0, 1]")
    if not 0.0 <= quantile <= 1.0:
        raise ValueError("quantile must be within [0, 1]")

    self.image_threshold = float(image_threshold)
    self.top_k_fraction = float(top_k_fraction)
    self.quantile = float(quantile)
    self.reduce_dims = (
        tuple(int(dim) for dim in reduce_dims) if reduce_dims is not None else None
    )
    super().__init__(
        image_threshold=self.image_threshold,
        top_k_fraction=self.top_k_fraction,
        quantile=self.quantile,
        reduce_dims=self.reduce_dims,
        **kwargs,
    )
forward
forward(logits, **_)

Apply two-stage binary decision: image-level gate + pixel quantile.

Stage 1: Compute image-level anomaly score from top-k pixel scores. If below threshold, return blank mask (no anomalies).

Stage 2: For images passing the gate, apply pixel-level quantile thresholding to create binary anomaly mask.

Parameters:

Name Type Description Default
logits Tensor

Anomaly scores [B, H, W, C] or [B, H, W, 1].

required
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "decisions" key containing binary masks [B, H, W, 1].

Notes

The image-level score is computed as the mean of the top-k% highest pixel scores. For multi-channel inputs, the max across channels is used for each pixel.

Source code in cuvis_ai/deciders/two_stage_decider.py
def forward(self, logits: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply two-stage binary decision: image-level gate + pixel quantile.

    Stage 1: Compute image-level anomaly score from top-k pixel scores.
    If below threshold, return blank mask (no anomalies).

    Stage 2: For images passing the gate, apply pixel-level quantile
    thresholding to create binary anomaly mask.

    Parameters
    ----------
    logits : Tensor
        Anomaly scores [B, H, W, C] or [B, H, W, 1].
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "decisions" key containing binary masks [B, H, W, 1].

    Notes
    -----
    The image-level score is computed as the mean of the top-k% highest
    pixel scores. For multi-channel inputs, the max across channels is
    used for each pixel.
    """
    tensor = logits
    bsz = tensor.shape[0]

    # DEBUG: Log input tensor stats
    logger.debug(
        f"TwoStageDecider input: shape={tensor.shape}, device={tensor.device}, "
        f"dtype={tensor.dtype}, min={tensor.min().item():.6f}, "
        f"max={tensor.max().item():.6f}, mean={tensor.mean().item():.6f}"
    )

    decisions = []
    for b in range(bsz):
        scores = tensor[b]  # [H, W, C]
        # Reduce to per-pixel max for image score
        if scores.dim() == 3:
            pixel_scores = scores.max(dim=-1)[0]
        else:
            pixel_scores = scores
        flat = pixel_scores.reshape(-1)
        k = max(
            1,
            int(
                torch.ceil(
                    torch.tensor(flat.numel() * self.top_k_fraction, dtype=torch.float32)
                ).item()
            ),
        )
        topk_vals, _ = torch.topk(flat, k)
        image_score = topk_vals.mean().item()  # Convert to Python float for comparison

        # DEBUG: Log intermediate computation values
        logger.debug(
            f"TwoStageDecider[batch={b}]: k={k}, topk_min={topk_vals.min().item():.6f}, "
            f"topk_max={topk_vals.max().item():.6f}, image_score={image_score:.6f}"
        )

        # Stage 1: Image-level gate
        if image_score < self.image_threshold:
            # Gate failed: return blank mask
            logger.debug(
                f"TwoStageDecider: image_score={image_score:.6f} < threshold={self.image_threshold:.6f}, "
                f"returning blank mask"
            )
            decisions.append(
                torch.zeros((*pixel_scores.shape, 1), dtype=torch.bool, device=tensor.device)
            )
            continue

        # Stage 2: Gate passed, apply pixel-level quantile thresholding
        logger.debug(
            f"TwoStageDecider: image_score={image_score:.6f} >= threshold={self.image_threshold:.6f}, "
            f"applying quantile thresholding (q={self.quantile})"
        )
        # Compute quantile threshold: reduce over all dimensions to get scalar per batch item
        # This matches QuantileBinaryDecider behavior: for [B, H, W, C] it reduces over (H, W, C)
        # For single batch item [H, W, C], we reduce over all dims (0, 1, 2)
        threshold = torch.quantile(scores, self.quantile)

        # Apply threshold: for multi-channel scores, take max across channels first
        if scores.dim() == 3:  # [H, W, C]
            # Take max across channels to get per-pixel score, then threshold
            pixel_scores = scores.max(dim=-1, keepdim=False)[0]  # [H, W]
            binary_map = (pixel_scores >= threshold).unsqueeze(-1).to(torch.bool)  # [H, W, 1]
        else:  # [H, W] - single channel
            binary_map = (scores >= threshold).unsqueeze(-1).to(torch.bool)  # [H, W, 1]

        decisions.append(binary_map)

    return {"decisions": torch.stack(decisions, dim=0)}

Data & Preprocessing Nodes

Nodes for data loading, normalization, and preprocessing.

Data Loader

data

Data loading nodes for hyperspectral anomaly detection pipelines.

This module provides specialized data nodes that convert multi-class segmentation datasets into binary anomaly detection tasks. Data nodes handle type conversions, label mapping, and format transformations required for pipeline processing.

LentilsAnomalyDataNode
LentilsAnomalyDataNode(
    normal_class_ids, anomaly_class_ids=None, **kwargs
)

Bases: Node

Data node for Lentils anomaly detection dataset with binary label mapping.

Converts multi-class Lentils segmentation data to binary anomaly detection format. Maps specified class IDs to normal (0) or anomaly (1) labels, and handles type conversions from uint16 to float32 for hyperspectral cubes.

Parameters:

Name Type Description Default
normal_class_ids list[int]

List of class IDs to treat as normal background (e.g., [0, 1] for unlabeled and black lentils)

required
anomaly_class_ids list[int] | None

List of class IDs to treat as anomalies. If None, all classes not in normal_class_ids are treated as anomalies (default: None)

None
**kwargs dict

Additional arguments passed to Node base class

{}

Attributes:

Name Type Description
_binary_mapper BinaryAnomalyLabelMapper

Internal label mapper for converting multi-class to binary masks

Examples:

>>> from cuvis_ai.node.data import LentilsAnomalyDataNode
>>> from cuvis_ai_core.data.datasets import SingleCu3sDataModule
>>>
>>> # Create datamodule for Lentils dataset
>>> datamodule = SingleCu3sDataModule(
...     data_dir="data/lentils",
...     batch_size=4,
... )
>>>
>>> # Create data node with normal class specification
>>> data_node = LentilsAnomalyDataNode(
...     normal_class_ids=[0, 1],  # Unlabeled and black lentils are normal
... )
>>>
>>> # Use in pipeline
>>> pipeline.add_node(data_node)
>>> pipeline.connect(
...     (data_node.cube, normalizer.data),
...     (data_node.mask, metrics.targets),
... )
See Also

BinaryAnomalyLabelMapper : Label mapping utility used internally SingleCu3sDataModule : DataModule for loading CU3S hyperspectral data docs/tutorials/rx-statistical.md : Complete example with LentilsAnomalyDataNode

Notes

The node performs the following transformations: - Converts hyperspectral cube from uint16 to float32 - Maps multi-class mask [B, H, W] to binary mask [B, H, W, 1] - Extracts wavelengths from first batch element (assumes consistent wavelengths)

Source code in cuvis_ai/node/data.py
def __init__(
    self, normal_class_ids: list[int], anomaly_class_ids: list[int] | None = None, **kwargs
) -> None:
    super().__init__(
        normal_class_ids=normal_class_ids, anomaly_class_ids=anomaly_class_ids, **kwargs
    )

    self._binary_mapper = BinaryAnomalyLabelMapper(  # could have be used as a node as well
        normal_class_ids=normal_class_ids,
        anomaly_class_ids=anomaly_class_ids,
    )
forward
forward(cube, mask=None, wavelengths=None, **_)

Process hyperspectral cube and convert labels to binary anomaly format.

Parameters:

Name Type Description Default
cube Tensor

Input hyperspectral cube, shape (B, H, W, C), dtype uint16

required
mask Tensor | None

Multi-class segmentation mask, shape (B, H, W), dtype int32. If None, only cube is returned (default: None)

None
wavelengths Tensor | None

Wavelengths for each channel, shape (B, C), dtype int32. If None, wavelengths are not included in output (default: None)

None

Returns:

Type Description
dict[str, Tensor | ndarray]

Dictionary containing: - "cube" : torch.Tensor Converted hyperspectral cube, shape (B, H, W, C), dtype float32 - "mask" : torch.Tensor (optional) Binary anomaly mask, shape (B, H, W, 1), dtype bool. Only included if input mask is provided. - "wavelengths" : np.ndarray (optional) Wavelength array, shape (C,), dtype int32. Only included if input wavelengths are provided.

Source code in cuvis_ai/node/data.py
def forward(
    self,
    cube: torch.Tensor,
    mask: torch.Tensor | None = None,
    wavelengths: torch.Tensor | None = None,
    **_: Any,
) -> dict[str, torch.Tensor | np.ndarray]:
    """Process hyperspectral cube and convert labels to binary anomaly format.

    Parameters
    ----------
    cube : torch.Tensor
        Input hyperspectral cube, shape (B, H, W, C), dtype uint16
    mask : torch.Tensor | None, optional
        Multi-class segmentation mask, shape (B, H, W), dtype int32.
        If None, only cube is returned (default: None)
    wavelengths : torch.Tensor | None, optional
        Wavelengths for each channel, shape (B, C), dtype int32.
        If None, wavelengths are not included in output (default: None)

    Returns
    -------
    dict[str, torch.Tensor | np.ndarray]
        Dictionary containing:
        - "cube" : torch.Tensor
            Converted hyperspectral cube, shape (B, H, W, C), dtype float32
        - "mask" : torch.Tensor (optional)
            Binary anomaly mask, shape (B, H, W, 1), dtype bool.
            Only included if input mask is provided.
        - "wavelengths" : np.ndarray (optional)
            Wavelength array, shape (C,), dtype int32.
            Only included if input wavelengths are provided.
    """
    result: dict[str, torch.Tensor | np.ndarray] = {"cube": cube.to(torch.float32)}

    # wavelengths passthrough, could check that in all batch elements the same wavelengths are used
    # input B x C -> output C
    if wavelengths is not None:
        result["wavelengths"] = wavelengths[0].cpu().numpy()

    if mask is not None:
        # Add channel dimension for mapper: BHW -> BHWC
        mask_4d = mask.unsqueeze(-1)

        # Always apply binary mapper
        mapped = self._binary_mapper.forward(
            cube=cube,
            mask=mask_4d,
            **_,  # Pass through additional kwargs
        )
        result["mask"] = mapped["mask"]  # Already BHWC bool

    return result

Normalization

normalization

Differentiable normalization nodes for BHWC hyperspectral data.

This module provides a collection of normalization nodes designed for hyperspectral imaging pipelines. All normalizers operate on BHWC format ([batch, height, width, channels]) and maintain gradient flow for end-to-end training.

Normalization strategies:

  • MinMaxNormalizer: Scales data to [0, 1] range using min-max statistics
  • ZScoreNormalizer: Standardizes data to zero mean and unit variance
  • SigmoidNormalizer: Applies sigmoid transformation with median centering
  • PerPixelUnitNorm: L2 normalization per pixel across channels
  • IdentityNormalizer: No-op passthrough for testing or baseline comparisons
  • SigmoidTransform: General-purpose sigmoid for logits→probabilities

Why Normalize?

Normalization is critical for stable anomaly detection and deep learning:

  1. Stable covariance estimation: RX detectors require well-conditioned covariance matrices
  2. Gradient stability: Prevents exploding/vanishing gradients during training
  3. Comparable scales: Ensures different spectral ranges contribute equally
  4. Faster convergence: Accelerates gradient-based optimization

BHWC Format Requirement

All normalizers expect BHWC input format. For HWC tensors, add batch dimension:

hwc_tensor = torch.randn(256, 256, 61) # [H, W, C] bhwc_tensor = hwc_tensor.unsqueeze(0) # [1, H, W, C]

IdentityNormalizer
IdentityNormalizer(**kwargs)

Bases: _ScoreNormalizerBase

No-op normalizer; preserves incoming scores.

Source code in cuvis_ai/node/normalization.py
def __init__(self, **kwargs) -> None:
    super().__init__(**kwargs)
MinMaxNormalizer
MinMaxNormalizer(
    eps=1e-06, use_running_stats=True, **kwargs
)

Bases: _ScoreNormalizerBase

Min-max normalization per sample and channel (keeps gradients).

Scales data to [0, 1] range using (x - min) / (max - min) transformation. Can operate in two modes:

  1. Per-sample normalization (use_running_stats=False): min/max computed per batch
  2. Global normalization (use_running_stats=True): uses running statistics from statistical initialization

Parameters:

Name Type Description Default
eps float

Small constant for numerical stability, prevents division by zero (default: 1e-6)

1e-06
use_running_stats bool

If True, use global min/max from statistical_initialization(). If False, compute min/max per batch during forward pass (default: True)

True
**kwargs dict

Additional arguments passed to Node base class

{}

Attributes:

Name Type Description
running_min Tensor

Global minimum value computed during statistical initialization

running_max Tensor

Global maximum value computed during statistical initialization

Examples:

>>> from cuvis_ai.node.normalization import MinMaxNormalizer
>>> from cuvis_ai_core.training import StatisticalTrainer
>>> import torch
>>>
>>> # Mode 1: Global normalization with statistical initialization
>>> normalizer = MinMaxNormalizer(eps=1.0e-6, use_running_stats=True)
>>> stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
>>> stat_trainer.fit()  # Computes global min/max from training data
>>>
>>> # Inference uses global statistics
>>> output = normalizer.forward(data=hyperspectral_cube)
>>> normalized = output["normalized"]  # [B, H, W, C], values in [0, 1]
>>>
>>> # Mode 2: Per-sample normalization (no initialization required)
>>> normalizer_local = MinMaxNormalizer(use_running_stats=False)
>>> output = normalizer_local.forward(data=hyperspectral_cube)
>>> # Each sample normalized independently using its own min/max
See Also

ZScoreNormalizer : Z-score standardization SigmoidNormalizer : Sigmoid-based normalization docs/tutorials/rx-statistical.md : RX pipeline with MinMaxNormalizer

Notes

Global normalization (use_running_stats=True) is recommended for RX detectors to ensure consistent scaling between training and inference. Per-sample normalization can be useful for real-time processing when training data is unavailable.

Source code in cuvis_ai/node/normalization.py
def __init__(self, eps: float = 1e-6, use_running_stats: bool = True, **kwargs) -> None:
    self.eps = float(eps)
    self.use_running_stats = use_running_stats
    super().__init__(eps=eps, use_running_stats=use_running_stats, **kwargs)

    # Running statistics for global normalization
    self.register_buffer("running_min", torch.tensor(float("nan")))
    self.register_buffer("running_max", torch.tensor(float("nan")))

    # Only require initialization when running stats are requested
    self._requires_initial_fit_override = self.use_running_stats
statistical_initialization
statistical_initialization(input_stream)

Compute global min/max from data iterator.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is the scores/data

required
Source code in cuvis_ai/node/normalization.py
def statistical_initialization(self, input_stream) -> None:
    """Compute global min/max from data iterator.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is the scores/data
    """
    all_mins = []
    all_maxs = []

    for batch_data in input_stream:
        # Extract data from port-based dict
        x = batch_data.get("data")
        if x is not None:
            # Flatten spatial dimensions
            flat = x.reshape(x.shape[0], -1)
            batch_min = flat.min()
            batch_max = flat.max()
            all_mins.append(batch_min)
            all_maxs.append(batch_max)

    if all_mins:
        self.running_min = torch.stack(all_mins).min()
        self.running_max = torch.stack(all_maxs).max()
        self._statistically_initialized = True
SigmoidNormalizer
SigmoidNormalizer(std_floor=1e-06, **kwargs)

Bases: _ScoreNormalizerBase

Median-centered sigmoid squashing per sample and channel.

Applies sigmoid transformation centered at the median with standard deviation scaling:

sigmoid((x - median) / std)

Produces values in [0, 1] range with median mapped to 0.5.

Parameters:

Name Type Description Default
std_floor float

Minimum standard deviation threshold to prevent division by zero (default: 1e-6)

1e-06
**kwargs dict

Additional arguments passed to Node base class

{}

Examples:

>>> from cuvis_ai.node.normalization import SigmoidNormalizer
>>> import torch
>>>
>>> # Create sigmoid normalizer
>>> normalizer = SigmoidNormalizer(std_floor=1.0e-6)
>>>
>>> # Apply to hyperspectral data
>>> data = torch.randn(4, 256, 256, 61)  # [B, H, W, C]
>>> output = normalizer.forward(data=data)
>>> normalized = output["normalized"]  # [4, 256, 256, 61], values in [0, 1]
See Also

MinMaxNormalizer : Min-max scaling to [0, 1] ZScoreNormalizer : Z-score standardization

Notes

Sigmoid normalization is robust to outliers because extreme values are squashed asymptotically to 0 or 1. This makes it suitable for data with heavy-tailed distributions or sporadic anomalies.

Source code in cuvis_ai/node/normalization.py
def __init__(self, std_floor: float = 1e-6, **kwargs) -> None:
    self.std_floor = float(std_floor)
    super().__init__(std_floor=std_floor, **kwargs)
ZScoreNormalizer
ZScoreNormalizer(
    dims=None, eps=1e-06, keepdim=True, **kwargs
)

Bases: _ScoreNormalizerBase

Z-score (standardization) normalization along specified dimensions.

Computes: (x - mean) / (std + eps) along specified dims. Per-sample normalization with no statistical initialization required.

Parameters:

Name Type Description Default
dims list[int]

Dimensions to compute statistics over (default: [1,2] for H,W in BHWC format)

None
eps float

Small constant for numerical stability (default: 1e-6)

1e-06
keepdim bool

Whether to keep reduced dimensions (default: True)

True

Examples:

>>> # Normalize over spatial dimensions (H, W)
>>> zscore = ZScoreNormalizer(dims=[1, 2])
>>>
>>> # Normalize over all spatial and channel dimensions
>>> zscore_all = ZScoreNormalizer(dims=[1, 2, 3])
Source code in cuvis_ai/node/normalization.py
def __init__(
    self, dims: list[int] | None = None, eps: float = 1e-6, keepdim: bool = True, **kwargs
) -> None:
    self.dims = dims if dims is not None else [1, 2]
    self.eps = float(eps)
    self.keepdim = keepdim
    super().__init__(dims=self.dims, eps=eps, keepdim=keepdim, **kwargs)
SigmoidTransform
SigmoidTransform(**kwargs)

Bases: Node

Applies sigmoid transformation to convert logits to probabilities [0,1].

General-purpose sigmoid node for converting raw scores/logits to probability space. Useful for visualization or downstream nodes that expect bounded [0,1] values.

Examples:

>>> sigmoid = SigmoidTransform()
>>> # Route logits to both loss (raw) and visualization (sigmoid)
>>> graph.connect(
...     (rx.scores, loss_node.predictions),  # Raw logits to loss
...     (rx.scores, sigmoid.data),           # Logits to sigmoid
...     (sigmoid.transformed, viz.scores),   # Probabilities to viz
... )
Source code in cuvis_ai/node/normalization.py
def __init__(self, **kwargs) -> None:
    super().__init__(**kwargs)
forward
forward(data, **_)

Apply sigmoid transformation.

Parameters:

Name Type Description Default
data Tensor

Input tensor

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "transformed" key containing sigmoid output

Source code in cuvis_ai/node/normalization.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply sigmoid transformation.

    Parameters
    ----------
    data : Tensor
        Input tensor

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "transformed" key containing sigmoid output
    """
    return {"transformed": torch.sigmoid(data)}
PerPixelUnitNorm
PerPixelUnitNorm(eps=1e-08, **kwargs)

Bases: _ScoreNormalizerBase

Per-pixel mean-centering and L2 normalization across channels.

Source code in cuvis_ai/node/normalization.py
def __init__(self, eps: float = 1e-8, **kwargs) -> None:
    self.eps = float(eps)
    super().__init__(eps=self.eps, **kwargs)
forward
forward(data, **_)

Normalize BHWC tensors per pixel.

Source code in cuvis_ai/node/normalization.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Normalize BHWC tensors per pixel."""
    normalized = self._normalize(data)
    return {"normalized": normalized}

Preprocessors

preprocessors

Preprocessing Nodes.

This module provides nodes for preprocessing hyperspectral data, including wavelength-based band selection and filtering. These nodes help reduce dimensionality and focus analysis on specific spectral regions of interest.

See Also

cuvis_ai.node.band_selection : Advanced band selection methods cuvis_ai.node.normalization : Normalization and standardization nodes

BandpassByWavelength
BandpassByWavelength(
    min_wavelength_nm, max_wavelength_nm=None, **kwargs
)

Bases: Node

Select channels by wavelength interval from BHWC tensors.

This node filters hyperspectral data by keeping only channels within a specified wavelength range. Wavelengths must be provided via the input port.

Parameters:

Name Type Description Default
min_wavelength_nm float

Minimum wavelength (inclusive) to keep, in nanometers

required
max_wavelength_nm float | None

Maximum wavelength (inclusive) to keep. If None, selects all wavelengths

= min_wavelength_nm. Default: None

None

Examples:

>>> # Create bandpass node
>>> bandpass = BandpassByWavelength(
...     min_wavelength_nm=500.0,
...     max_wavelength_nm=700.0,
... )
>>> # Filter cube in BHWC format with wavelengths from input port
>>> wavelengths_tensor = torch.from_numpy(wavelengths).float()
>>> filtered = bandpass.forward(data=cube_bhwc, wavelengths=wavelengths_tensor)["filtered"]
>>>
>>> # For single HWC images, add a batch dimension first:
>>> # filtered = bandpass.forward(data=cube_hwc.unsqueeze(0), wavelengths=wavelengths_tensor)["filtered"]
>>>
>>> # Use with wavelengths from upstream node
>>> pipeline.connect(
...     (data_node.outputs.cube, bandpass.data),
...     (data_node.outputs.wavelengths, bandpass.wavelengths),
... )
Source code in cuvis_ai/node/preprocessors.py
def __init__(
    self,
    min_wavelength_nm: float,
    max_wavelength_nm: float | None = None,
    **kwargs,
) -> None:
    self.min_wavelength_nm = float(min_wavelength_nm)
    self.max_wavelength_nm = float(max_wavelength_nm) if max_wavelength_nm is not None else None

    super().__init__(
        min_wavelength_nm=self.min_wavelength_nm,
        max_wavelength_nm=self.max_wavelength_nm,
        **kwargs,
    )
forward
forward(data, wavelengths, **kwargs)

Filter cube by wavelength range.

Parameters:

Name Type Description Default
data Tensor

Input hyperspectral cube [B, H, W, C].

required
wavelengths Tensor

Wavelengths tensor [C] in nanometers.

required
**kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "filtered" key containing filtered cube [B, H, W, C_filtered]

Raises:

Type Description
ValueError

If no channels are selected by the provided wavelength range

Source code in cuvis_ai/node/preprocessors.py
def forward(self, data: Tensor, wavelengths: Tensor, **kwargs: Any) -> dict[str, Tensor]:
    """Filter cube by wavelength range.

    Parameters
    ----------
    data : Tensor
        Input hyperspectral cube [B, H, W, C].
    wavelengths : Tensor
        Wavelengths tensor [C] in nanometers.
    **kwargs : Any
        Additional keyword arguments (unused).

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "filtered" key containing filtered cube [B, H, W, C_filtered]

    Raises
    ------
    ValueError
        If no channels are selected by the provided wavelength range
    """

    # Create mask for wavelength range
    if self.max_wavelength_nm is None:
        keep_mask = wavelengths >= self.min_wavelength_nm
    else:
        keep_mask = (wavelengths >= self.min_wavelength_nm) & (
            wavelengths <= self.max_wavelength_nm
        )

    if keep_mask.sum().item() == 0:
        raise ValueError("No channels selected by the provided wavelength range")

    # Filter cube
    filtered = data[..., keep_mask]

    return {"filtered": filtered}

Conversion

conversion

RX Logit Head for Anomaly Detection

This module provides a trainable head that converts RX anomaly scores into logits for binary anomaly classification. It can be trained end-to-end with binary cross-entropy loss.

ScoreToLogit
ScoreToLogit(init_scale=1.0, init_bias=0.0, **kwargs)

Bases: Node

Trainable head that converts RX scores to anomaly logits.

This node takes RX anomaly scores (typically Mahalanobis distances) and applies a learned affine transformation to produce logits suitable for binary classification with BCEWithLogitsLoss.

The transformation is: logit = scale * (score - bias)

Parameters:

Name Type Description Default
init_scale float

Initial value for the scale parameter

1.0
init_bias float

Initial value for the bias parameter (threshold)

0.0

Attributes:

Name Type Description
scale Parameter or Tensor

Scale factor applied to scores

bias Parameter or Tensor

Bias (threshold) subtracted from scores before scaling

Examples:

>>> # After RX detector
>>> rx = RXGlobal(eps=1e-6)
>>> logit_head = ScoreToLogit(init_scale=1.0, init_bias=5.0)
>>> logit_head.unfreeze()  # Enable gradient training
>>> graph.connect(rx.scores, logit_head.scores)
Source code in cuvis_ai/node/conversion.py
def __init__(
    self,
    init_scale: float = 1.0,
    init_bias: float = 0.0,
    **kwargs,
) -> None:
    self.init_scale = init_scale
    self.init_bias = init_bias

    super().__init__(
        init_scale=init_scale,
        init_bias=init_bias,
        **kwargs,
    )

    # Initialize as buffers (frozen by default)
    self.register_buffer("scale", torch.tensor(init_scale, dtype=torch.float32))
    self.register_buffer("bias", torch.tensor(init_bias, dtype=torch.float32))

    # Streaming accumulators for statistics (similar to RXGlobal)
    self.register_buffer("_mean", torch.tensor(float("nan")))
    self.register_buffer("_M2", torch.tensor(float("nan")))
    self.register_buffer("_n", torch.tensor(0, dtype=torch.long))
    # Allow using the head with the provided init_scale/init_bias without forcing a fit()
    self._statistically_initialized = True
unfreeze
unfreeze()

Convert scale and bias buffers to trainable nn.Parameters.

Call this method to enable gradient-based optimization of the scale and bias parameters. They will be converted from buffers to nn.Parameters, allowing gradient updates during training.

Example

logit_head = ScoreToLogit(init_scale=1.0, init_bias=5.0) logit_head.unfreeze() # Enable gradient training

Now scale and bias can be optimized
Source code in cuvis_ai/node/conversion.py
def unfreeze(self) -> None:
    """Convert scale and bias buffers to trainable nn.Parameters.

    Call this method to enable gradient-based optimization of the
    scale and bias parameters. They will be converted from buffers to
    nn.Parameters, allowing gradient updates during training.

    Example
    -------
    >>> logit_head = ScoreToLogit(init_scale=1.0, init_bias=5.0)
    >>> logit_head.unfreeze()  # Enable gradient training
    >>> # Now scale and bias can be optimized
    """
    if self.scale is not None and self.bias is not None:
        # Convert buffers to parameters
        self.scale = nn.Parameter(self.scale.clone(), requires_grad=True)
        self.bias = nn.Parameter(self.bias.clone(), requires_grad=True)
    # Call parent to enable requires_grad
    super().unfreeze()
statistical_initialization
statistical_initialization(input_stream)

Initialize bias from statistics of RX scores using streaming approach.

Uses Welford's algorithm for numerically stable online computation of mean and standard deviation, similar to RXGlobal.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"scores": tensor} where tensor is the RX scores

required
Source code in cuvis_ai/node/conversion.py
def statistical_initialization(self, input_stream) -> None:
    """Initialize bias from statistics of RX scores using streaming approach.

    Uses Welford's algorithm for numerically stable online computation of
    mean and standard deviation, similar to RXGlobal.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"scores": tensor} where tensor is the RX scores
    """
    self.reset()
    for batch_data in input_stream:
        # Extract scores from port-based dict
        scores = batch_data.get("scores")
        if scores is not None:
            self.update(scores)

    if self._n > 0:
        self.finalize()
    self._statistically_initialized = True
update
update(scores)

Update running statistics with a batch of scores.

Uses Welford's online algorithm for numerically stable computation.

Parameters:

Name Type Description Default
scores Tensor

Batch of RX scores in BHWC format

required
Source code in cuvis_ai/node/conversion.py
@torch.no_grad()
def update(self, scores: torch.Tensor) -> None:
    """Update running statistics with a batch of scores.

    Uses Welford's online algorithm for numerically stable computation.

    Parameters
    ----------
    scores : torch.Tensor
        Batch of RX scores in BHWC format
    """
    # Flatten to 1D
    X = scores.flatten()
    m = X.shape[0]
    if m <= 1:
        return

    mean_b = X.mean()
    M2_b = ((X - mean_b) ** 2).sum()

    n = int(self._n.item())
    if n == 0:
        self._n = torch.tensor(m, dtype=torch.long, device=self.scale.device)
        self._mean = mean_b
        self._M2 = M2_b
    else:
        tot = n + m
        delta = mean_b - self._mean
        new_mean = self._mean + delta * (m / tot)
        self._M2 = self._M2 + M2_b + (delta**2) * (n * m / tot)
        self._n = torch.tensor(tot, dtype=torch.long, device=self.scale.device)
        self._mean = new_mean
    self._statistically_initialized = False
finalize
finalize()

Finalize statistics and set bias to mean + 2*std.

This threshold (mean + 2*std) is a common heuristic for anomaly detection, capturing ~95% of normal data under Gaussian assumption.

Source code in cuvis_ai/node/conversion.py
@torch.no_grad()
def finalize(self) -> None:
    """Finalize statistics and set bias to mean + 2*std.

    This threshold (mean + 2*std) is a common heuristic for anomaly detection,
    capturing ~95% of normal data under Gaussian assumption.
    """
    if int(self._n.item()) <= 1:
        raise ValueError("Not enough samples to finalize ScoreToLogit statistics.")

    mean = self._mean.clone()
    variance = self._M2 / (self._n - 1)
    std = torch.sqrt(variance)

    # Set bias to mean + 2*std (threshold for anomalies)
    self.bias = mean + 2.0 * std
    self._statistically_initialized = True
reset
reset()

Reset all statistics and accumulators.

Source code in cuvis_ai/node/conversion.py
def reset(self) -> None:
    """Reset all statistics and accumulators."""
    self._n = torch.tensor(0, dtype=torch.long, device=self.scale.device)
    self._mean = torch.tensor(float("nan"))
    self._M2 = torch.tensor(float("nan"))
    self._statistically_initialized = False
forward
forward(scores, **_)

Transform RX scores to logits.

Parameters:

Name Type Description Default
scores Tensor

Input RX scores with shape (B, H, W, 1)

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "logits" key containing transformed scores

Source code in cuvis_ai/node/conversion.py
def forward(self, scores: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Transform RX scores to logits.

    Parameters
    ----------
    scores : torch.Tensor
        Input RX scores with shape (B, H, W, 1)

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "logits" key containing transformed scores
    """

    if not self._statistically_initialized:
        raise RuntimeError(
            "ScoreToLogit not initialized. Call statistical_initialization() before forward()."
        )
    # Apply affine transformation: logit = scale * (score - bias)
    logits = self.scale * (scores - self.bias)

    return {"logits": logits}
get_threshold
get_threshold()

Get the current anomaly threshold (bias value).

Returns:

Type Description
float

Current threshold value

Source code in cuvis_ai/node/conversion.py
def get_threshold(self) -> float:
    """Get the current anomaly threshold (bias value).

    Returns
    -------
    float
        Current threshold value
    """
    return self.bias.item()
set_threshold
set_threshold(threshold)

Set the anomaly threshold (bias value).

Parameters:

Name Type Description Default
threshold float

New threshold value

required
Source code in cuvis_ai/node/conversion.py
def set_threshold(self, threshold: float) -> None:
    """Set the anomaly threshold (bias value).

    Parameters
    ----------
    threshold : float
        New threshold value
    """
    with torch.no_grad():
        self.bias.fill_(threshold)
predict_anomalies
predict_anomalies(logits)

Convert logits to binary anomaly predictions.

Parameters:

Name Type Description Default
logits Tensor

Logits from forward pass, shape (B, H, W, 1)

required

Returns:

Type Description
Tensor

Binary predictions (0=normal, 1=anomaly), shape (B, H, W, 1)

Source code in cuvis_ai/node/conversion.py
def predict_anomalies(self, logits: torch.Tensor) -> torch.Tensor:
    """Convert logits to binary anomaly predictions.

    Parameters
    ----------
    logits : torch.Tensor
        Logits from forward pass, shape (B, H, W, 1)

    Returns
    -------
    torch.Tensor
        Binary predictions (0=normal, 1=anomaly), shape (B, H, W, 1)
    """
    return (logits > 0).float()

Channel & Band Selection Nodes

Nodes for selecting and transforming spectral channels.

Band Selection

band_selection

Band selection nodes for HSI to RGB conversion.

This module provides port-based nodes for selecting spectral bands from hyperspectral cubes and composing RGB images for downstream processing (e.g., with AdaCLIP).

BandSelectorBase

Bases: Node

Base class for hyperspectral band selection strategies.

This base class defines the common input/output ports for band selection nodes. Subclasses should implement the forward() method to perform specific band selection strategies.

Ports

INPUT_SPECS cube : float32, shape (-1, -1, -1, -1) Hyperspectral cube in BHWC format. wavelengths : float32, shape (-1,) Wavelength array in nanometers. OUTPUT_SPECS rgb_image : float32, shape (-1, -1, -1, 3) Composed RGB image in BHWC format (0-1 range). band_info : dict Metadata about selected bands.

BaselineFalseRGBSelector
BaselineFalseRGBSelector(
    target_wavelengths=(650.0, 550.0, 450.0), **kwargs
)

Bases: BandSelectorBase

Fixed wavelength band selection (e.g., 650, 550, 450 nm).

Selects bands nearest to the specified target wavelengths for R, G, B channels. This is the simplest band selection strategy that produces "true color-ish" images.

Parameters:

Name Type Description Default
target_wavelengths tuple[float, float, float]

Target wavelengths for R, G, B channels in nanometers. Default: (650.0, 550.0, 450.0)

(650.0, 550.0, 450.0)
Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    target_wavelengths: tuple[float, float, float] = (650.0, 550.0, 450.0),
    **kwargs,
) -> None:
    super().__init__(target_wavelengths=target_wavelengths, **kwargs)
    self.target_wavelengths = target_wavelengths
forward
forward(cube, wavelengths, context=None, **_)

Select bands and compose RGB image.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths Tensor

Wavelength array [C].

required

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" and "band_info" keys.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: Any,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Select bands and compose RGB image.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : torch.Tensor
        Wavelength array [C].

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" and "band_info" keys.
    """
    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)

    # Find nearest bands
    indices = [self._nearest_band_index(wavelengths_np, nm) for nm in self.target_wavelengths]

    # Compose RGB
    rgb = self._compose_rgb(cube, indices)

    band_info = {
        "strategy": "baseline_false_rgb",
        "band_indices": indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in indices],
        "target_wavelengths_nm": list(self.target_wavelengths),
    }

    return {"rgb_image": rgb, "band_info": band_info}
HighContrastBandSelector
HighContrastBandSelector(
    windows=((440, 500), (500, 580), (610, 700)),
    alpha=0.1,
    **kwargs,
)

Bases: BandSelectorBase

Data-driven band selection using spatial variance + Laplacian energy.

For each wavelength window, selects the band with the highest score based on: score = variance + alpha * Laplacian_energy

This produces "high contrast" images that may work better for visual anomaly detection.

Parameters:

Name Type Description Default
windows Sequence[tuple[float, float]]

Wavelength windows for Blue, Green, Red channels. Default: ((440, 500), (500, 580), (610, 700)) for visible spectrum.

((440, 500), (500, 580), (610, 700))
alpha float

Weight for Laplacian energy term. Default: 0.1

0.1
Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    windows: Sequence[tuple[float, float]] = ((440, 500), (500, 580), (610, 700)),
    alpha: float = 0.1,
    **kwargs,
) -> None:
    super().__init__(windows=windows, alpha=alpha, **kwargs)
    self.windows = list(windows)
    self.alpha = alpha
forward
forward(cube, wavelengths, context=None, **_)

Select high-contrast bands and compose RGB image.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths Tensor

Wavelength array [C].

required

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" and "band_info" keys.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: Any,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Select high-contrast bands and compose RGB image.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : torch.Tensor
        Wavelength array [C].

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" and "band_info" keys.
    """
    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)
    # Use first batch item for band selection
    cube_np = cube[0].cpu().numpy()

    selected_indices = []
    for start, end in self.windows:
        mask = (wavelengths_np >= start) & (wavelengths_np <= end)
        window_indices = np.where(mask)[0]

        if len(window_indices) == 0:
            # Fallback to nearest single wavelength
            nearest = self._nearest_band_index(wavelengths_np, (start + end) / 2.0)
            selected_indices.append(int(nearest))
            continue

        scores = []
        for idx in window_indices:
            band = cube_np[..., idx]
            variance = float(np.var(band))
            lap_energy = float(np.mean(np.abs(laplace(band))))
            scores.append(variance + self.alpha * lap_energy)

        best_idx = int(window_indices[int(np.argmax(scores))])
        selected_indices.append(best_idx)

    rgb = self._compose_rgb(cube, selected_indices)

    band_info = {
        "strategy": "high_contrast",
        "band_indices": selected_indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in selected_indices],
        "windows_nm": [[float(s), float(e)] for s, e in self.windows],
        "alpha": self.alpha,
    }

    return {"rgb_image": rgb, "band_info": band_info}
CIRFalseColorSelector
CIRFalseColorSelector(
    nir_nm=860.0, red_nm=670.0, green_nm=560.0, **kwargs
)

Bases: BandSelectorBase

Color Infrared (CIR) false color composition.

Maps NIR to Red, Red to Green, Green to Blue for false-color composites. This is useful for highlighting vegetation and certain anomalies.

Parameters:

Name Type Description Default
nir_nm float

Near-infrared wavelength in nm. Default: 860.0

860.0
red_nm float

Red wavelength in nm. Default: 670.0

670.0
green_nm float

Green wavelength in nm. Default: 560.0

560.0
Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    nir_nm: float = 860.0,
    red_nm: float = 670.0,
    green_nm: float = 560.0,
    **kwargs,
) -> None:
    super().__init__(nir_nm=nir_nm, red_nm=red_nm, green_nm=green_nm, **kwargs)
    self.nir_nm = nir_nm
    self.red_nm = red_nm
    self.green_nm = green_nm
forward
forward(cube, wavelengths, context=None, **_)

Select CIR bands and compose false-color image.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths Tensor

Wavelength array [C].

required

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" and "band_info" keys.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: Any,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Select CIR bands and compose false-color image.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : torch.Tensor
        Wavelength array [C].

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" and "band_info" keys.
    """
    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)

    # CIR mapping: NIR -> R, Red -> G, Green -> B
    nir_idx = self._nearest_band_index(wavelengths_np, self.nir_nm)
    red_idx = self._nearest_band_index(wavelengths_np, self.red_nm)
    green_idx = self._nearest_band_index(wavelengths_np, self.green_nm)

    indices = [nir_idx, red_idx, green_idx]
    rgb = self._compose_rgb(cube, indices)

    band_info = {
        "strategy": "cir_false_color",
        "band_indices": indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in indices],
        "target_wavelengths_nm": [self.nir_nm, self.red_nm, self.green_nm],
        "channel_mapping": {"R": "NIR", "G": "Red", "B": "Green"},
    }

    return {"rgb_image": rgb, "band_info": band_info}
SupervisedBandSelectorBase
SupervisedBandSelectorBase(
    num_spectral_bands,
    score_weights=(1.0, 1.0, 1.0),
    lambda_penalty=0.5,
    **kwargs,
)

Bases: BandSelectorBase

Base class for supervised band selection strategies.

This class adds an optional mask input port and implements common logic for statistical initialization via :meth:fit.

The mask is assumed to be binary (0/1), where 1 denotes the positive class (e.g. stone) and 0 denotes the negative class (e.g. lentil/background).

Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    num_spectral_bands: int,
    score_weights: tuple[float, float, float] = (1.0, 1.0, 1.0),
    lambda_penalty: float = 0.5,
    **kwargs: Any,
) -> None:
    # Call super().__init__ FIRST so Serializable captures hparams correctly
    super().__init__(
        num_spectral_bands=num_spectral_bands,
        score_weights=score_weights,
        lambda_penalty=lambda_penalty,
        **kwargs,
    )
    # Then set instance attributes
    self.num_spectral_bands = num_spectral_bands
    self.score_weights = score_weights
    self.lambda_penalty = lambda_penalty
    # Initialize buffers with correct shapes (not empty)
    # selected_indices: always 3 for RGB
    # score buffers: num_spectral_bands
    self.register_buffer("selected_indices", torch.zeros(3, dtype=torch.long), persistent=True)
    self.register_buffer(
        "band_scores", torch.zeros(num_spectral_bands, dtype=torch.float32), persistent=True
    )
    self.register_buffer(
        "fisher_scores", torch.zeros(num_spectral_bands, dtype=torch.float32), persistent=True
    )
    self.register_buffer(
        "auc_scores", torch.zeros(num_spectral_bands, dtype=torch.float32), persistent=True
    )
    self.register_buffer(
        "mi_scores", torch.zeros(num_spectral_bands, dtype=torch.float32), persistent=True
    )
    # Use standard instance attribute for initialization tracking
    self._statistically_initialized = False
requires_initial_fit property
requires_initial_fit

Whether this node requires statistical initialization from training data.

Returns:

Type Description
bool

Always True for supervised band selectors.

SupervisedCIRBandSelector
SupervisedCIRBandSelector(
    windows=(
        (840.0, 910.0),
        (650.0, 720.0),
        (500.0, 570.0),
    ),
    score_weights=(1.0, 1.0, 1.0),
    lambda_penalty=0.5,
    **kwargs,
)

Bases: SupervisedBandSelectorBase

Supervised CIR/NIR band selection with window constraints.

Windows are typically set to: - NIR: 840-910 nm - Red: 650-720 nm - Green: 500-570 nm

The selector chooses one band per window using a supervised score (Fisher + AUC + MI) with an mRMR-style redundancy penalty.

Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    windows: Sequence[tuple[float, float]] = ((840.0, 910.0), (650.0, 720.0), (500.0, 570.0)),
    score_weights: tuple[float, float, float] = (1.0, 1.0, 1.0),
    lambda_penalty: float = 0.5,
    **kwargs: Any,
) -> None:
    # Call super().__init__ FIRST so Serializable captures hparams correctly
    super().__init__(
        score_weights=score_weights,
        lambda_penalty=lambda_penalty,
        windows=list(windows),
        **kwargs,
    )
    # Then set instance attributes
    self.windows = list(windows)
statistical_initialization
statistical_initialization(input_stream)

Initialize band selection using supervised scoring with CIR windows.

Computes Fisher, AUC, and MI scores for each band, applies mRMR selection within CIR-specific wavelength windows, and stores the 3 selected bands.

Parameters:

Name Type Description Default
input_stream InputStream

Training data stream with cube, mask, and wavelengths.

required

Raises:

Type Description
ValueError

If the mRMR selection doesn't return exactly 3 bands.

Source code in cuvis_ai/node/band_selection.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize band selection using supervised scoring with CIR windows.

    Computes Fisher, AUC, and MI scores for each band, applies mRMR selection
    within CIR-specific wavelength windows, and stores the 3 selected bands.

    Parameters
    ----------
    input_stream : InputStream
        Training data stream with cube, mask, and wavelengths.

    Raises
    ------
    ValueError
        If the mRMR selection doesn't return exactly 3 bands.
    """
    cubes, masks, wavelengths = self._collect_training_data(input_stream)
    band_scores, fisher_scores, auc_scores, mi_scores = _compute_band_scores_supervised(
        cubes,
        masks,
        wavelengths,
        self.score_weights,
    )
    corr_matrix = _compute_band_correlation_matrix(cubes, len(wavelengths))
    selected_indices = _mrmr_band_selection(
        band_scores,
        wavelengths,
        self.windows,
        corr_matrix,
        self.lambda_penalty,
    )
    if len(selected_indices) != 3:
        raise ValueError(
            f"SupervisedCIRBandSelector expected 3 bands, got {len(selected_indices)}"
        )
    self._store_scores_and_indices(
        band_scores, fisher_scores, auc_scores, mi_scores, selected_indices
    )
forward
forward(cube, wavelengths, mask=None, context=None, **_)

Generate false-color RGB from selected CIR bands.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths ndarray

Wavelengths for each channel [C].

required
mask Tensor

Ground truth mask (unused in forward, required for initialization).

None
context Context

Pipeline execution context (unused).

None
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

Raises:

Type Description
RuntimeError

If the node has not been statistically initialized.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: np.ndarray,
    mask: torch.Tensor | None = None,  # noqa: ARG002
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Generate false-color RGB from selected CIR bands.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : np.ndarray
        Wavelengths for each channel [C].
    mask : torch.Tensor, optional
        Ground truth mask (unused in forward, required for initialization).
    context : Context, optional
        Pipeline execution context (unused).
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

    Raises
    ------
    RuntimeError
        If the node has not been statistically initialized.
    """
    if not self._statistically_initialized or self.selected_indices.numel() != 3:
        raise RuntimeError("SupervisedCIRBandSelector not fitted")

    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)
    indices = self.selected_indices.tolist()
    rgb = self._compose_rgb(cube, indices)

    band_info = {
        "strategy": "supervised_cir",
        "band_indices": indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in indices],
        "windows_nm": [[float(s), float(e)] for s, e in self.windows],
        "score_weights": list(self.score_weights),
        "lambda_penalty": float(self.lambda_penalty),
    }
    return {"rgb_image": rgb, "band_info": band_info}
SupervisedWindowedFalseRGBSelector
SupervisedWindowedFalseRGBSelector(
    windows=(
        (440.0, 500.0),
        (500.0, 580.0),
        (610.0, 700.0),
    ),
    score_weights=(1.0, 1.0, 1.0),
    lambda_penalty=0.5,
    **kwargs,
)

Bases: SupervisedBandSelectorBase

Supervised band selection constrained to visible RGB windows.

Similar to :class:HighContrastBandSelector, but uses label-driven scores. Default windows: - Blue: 440–500 nm - Green: 500–580 nm - Red: 610–700 nm

Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    windows: Sequence[tuple[float, float]] = ((440.0, 500.0), (500.0, 580.0), (610.0, 700.0)),
    score_weights: tuple[float, float, float] = (1.0, 1.0, 1.0),
    lambda_penalty: float = 0.5,
    **kwargs: Any,
) -> None:
    # Call super().__init__ FIRST so Serializable captures hparams correctly
    super().__init__(
        score_weights=score_weights,
        lambda_penalty=lambda_penalty,
        windows=list(windows),
        **kwargs,
    )
    # Then set instance attributes
    self.windows = list(windows)
statistical_initialization
statistical_initialization(input_stream)

Initialize band selection using supervised scoring with RGB windows.

Computes Fisher, AUC, and MI scores for each band, applies mRMR selection within RGB wavelength windows (blue/green/red), and stores the 3 selected bands.

Parameters:

Name Type Description Default
input_stream InputStream

Training data stream with cube, mask, and wavelengths.

required

Raises:

Type Description
ValueError

If the mRMR selection doesn't return exactly 3 bands.

Source code in cuvis_ai/node/band_selection.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize band selection using supervised scoring with RGB windows.

    Computes Fisher, AUC, and MI scores for each band, applies mRMR selection
    within RGB wavelength windows (blue/green/red), and stores the 3 selected bands.

    Parameters
    ----------
    input_stream : InputStream
        Training data stream with cube, mask, and wavelengths.

    Raises
    ------
    ValueError
        If the mRMR selection doesn't return exactly 3 bands.
    """
    cubes, masks, wavelengths = self._collect_training_data(input_stream)
    band_scores, fisher_scores, auc_scores, mi_scores = _compute_band_scores_supervised(
        cubes,
        masks,
        wavelengths,
        self.score_weights,
    )
    corr_matrix = _compute_band_correlation_matrix(cubes, len(wavelengths))
    selected_indices = _mrmr_band_selection(
        band_scores,
        wavelengths,
        self.windows,
        corr_matrix,
        self.lambda_penalty,
    )
    if len(selected_indices) != 3:
        raise ValueError(
            f"SupervisedWindowedFalseRGBSelector expected 3 bands, got {len(selected_indices)}",
        )
    self._store_scores_and_indices(
        band_scores, fisher_scores, auc_scores, mi_scores, selected_indices
    )
forward
forward(cube, wavelengths, mask=None, context=None, **_)

Generate false-color RGB from selected windowed bands.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths ndarray

Wavelengths for each channel [C].

required
mask Tensor

Ground truth mask (unused in forward, required for initialization).

None
context Context

Pipeline execution context (unused).

None
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

Raises:

Type Description
RuntimeError

If the node has not been statistically initialized.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: np.ndarray,
    mask: torch.Tensor | None = None,  # noqa: ARG002
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Generate false-color RGB from selected windowed bands.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : np.ndarray
        Wavelengths for each channel [C].
    mask : torch.Tensor, optional
        Ground truth mask (unused in forward, required for initialization).
    context : Context, optional
        Pipeline execution context (unused).
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

    Raises
    ------
    RuntimeError
        If the node has not been statistically initialized.
    """
    if not self._statistically_initialized or self.selected_indices.numel() != 3:
        raise RuntimeError("SupervisedWindowedFalseRGBSelector not fitted")

    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)
    indices = self.selected_indices.tolist()
    rgb = self._compose_rgb(cube, indices)

    band_info = {
        "strategy": "supervised_windowed_false_rgb",
        "band_indices": indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in indices],
        "windows_nm": [[float(s), float(e)] for s, e in self.windows],
        "score_weights": list(self.score_weights),
        "lambda_penalty": float(self.lambda_penalty),
    }
    return {"rgb_image": rgb, "band_info": band_info}
SupervisedFullSpectrumBandSelector
SupervisedFullSpectrumBandSelector(
    score_weights=(1.0, 1.0, 1.0),
    lambda_penalty=0.5,
    **kwargs,
)

Bases: SupervisedBandSelectorBase

Supervised selection without window constraints.

Picks the top-3 discriminative bands globally with an mRMR-style redundancy penalty applied over the full spectrum.

Source code in cuvis_ai/node/band_selection.py
def __init__(
    self,
    score_weights: tuple[float, float, float] = (1.0, 1.0, 1.0),
    lambda_penalty: float = 0.5,
    **kwargs: Any,
) -> None:
    super().__init__(score_weights=score_weights, lambda_penalty=lambda_penalty, **kwargs)
statistical_initialization
statistical_initialization(input_stream)

Initialize band selection using supervised scoring across full spectrum.

Computes Fisher, AUC, and MI scores for each band, applies mRMR selection globally without wavelength window constraints, and stores the 3 selected bands.

Parameters:

Name Type Description Default
input_stream InputStream

Training data stream with cube, mask, and wavelengths.

required

Raises:

Type Description
ValueError

If the mRMR selection doesn't return exactly 3 bands.

Source code in cuvis_ai/node/band_selection.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize band selection using supervised scoring across full spectrum.

    Computes Fisher, AUC, and MI scores for each band, applies mRMR selection
    globally without wavelength window constraints, and stores the 3 selected bands.

    Parameters
    ----------
    input_stream : InputStream
        Training data stream with cube, mask, and wavelengths.

    Raises
    ------
    ValueError
        If the mRMR selection doesn't return exactly 3 bands.
    """
    cubes, masks, wavelengths = self._collect_training_data(input_stream)
    band_scores, fisher_scores, auc_scores, mi_scores = _compute_band_scores_supervised(
        cubes,
        masks,
        wavelengths,
        self.score_weights,
    )
    corr_matrix = _compute_band_correlation_matrix(cubes, len(wavelengths))
    selected_indices = _select_top_k_bands(
        band_scores,
        corr_matrix,
        k=3,
        lambda_penalty=self.lambda_penalty,
    )
    if len(selected_indices) != 3:
        raise ValueError(
            f"SupervisedFullSpectrumBandSelector expected 3 bands, got {len(selected_indices)}",
        )
    self._store_scores_and_indices(
        band_scores, fisher_scores, auc_scores, mi_scores, selected_indices
    )
forward
forward(cube, wavelengths, mask=None, context=None, **_)

Generate false-color RGB from globally selected bands.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
wavelengths ndarray

Wavelengths for each channel [C].

required
mask Tensor

Ground truth mask (unused in forward, required for initialization).

None
context Context

Pipeline execution context (unused).

None
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Any]

Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

Raises:

Type Description
RuntimeError

If the node has not been statistically initialized.

Source code in cuvis_ai/node/band_selection.py
def forward(
    self,
    cube: torch.Tensor,
    wavelengths: np.ndarray,
    mask: torch.Tensor | None = None,  # noqa: ARG002
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Generate false-color RGB from globally selected bands.

    Parameters
    ----------
    cube : torch.Tensor
        Hyperspectral cube [B, H, W, C].
    wavelengths : np.ndarray
        Wavelengths for each channel [C].
    mask : torch.Tensor, optional
        Ground truth mask (unused in forward, required for initialization).
    context : Context, optional
        Pipeline execution context (unused).
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Any]
        Dictionary with "rgb_image" [B, H, W, 3] and "band_info" metadata.

    Raises
    ------
    RuntimeError
        If the node has not been statistically initialized.
    """
    if not self._statistically_initialized or self.selected_indices.numel() != 3:
        raise RuntimeError("SupervisedFullSpectrumBandSelector not fitted")

    wavelengths_np = np.asarray(wavelengths, dtype=np.float32)
    indices = self.selected_indices.tolist()
    rgb = self._compose_rgb(cube, indices)

    band_info = {
        "strategy": "supervised_full_spectrum",
        "band_indices": indices,
        "band_wavelengths_nm": [float(wavelengths_np[i]) for i in indices],
        "score_weights": list(self.score_weights),
        "lambda_penalty": float(self.lambda_penalty),
    }
    return {"rgb_image": rgb, "band_info": band_info}

Channel Mixer

channel_mixer

Learnable channel mixer node for DRCNN-style spectral data reduction.

This module implements a learnable channel mixer based on the Data Reduction CNN (DRCNN) approach from Zeegers et al. (2020). The mixer performs spectral pixel-wise 1x1 convolutions to reduce hyperspectral data to a smaller number of channels (e.g., 61 → 3 for RGB compatibility).

Reference: Zeegers et al., "Task-Driven Learned Hyperspectral Data Reduction Using End-to-End Supervised Deep Learning," J. Imaging 6(12):132, 2020.

LearnableChannelMixer
LearnableChannelMixer(
    input_channels,
    output_channels,
    leaky_relu_negative_slope=0.01,
    use_bias=True,
    use_activation=True,
    normalize_output=True,
    init_method="xavier",
    eps=1e-06,
    reduction_scheme=None,
    **kwargs,
)

Bases: Node

Learnable channel mixer for hyperspectral data reduction (DRCNN-style).

This node implements a learnable linear combination layer that reduces the number of spectral channels through spectral pixel-wise 1x1 convolutions. Based on the DRCNN approach, it uses: - 1x1 convolution (linear combination across spectral dimension) - Leaky ReLU activation (a=0.01) - Bias parameters - Optional PCA-based initialization

The mixer is designed to be trained end-to-end with a downstream model (e.g., AdaClip) while keeping the downstream model frozen. This allows the mixer to learn optimal spectral combinations for the specific task.

Parameters:

Name Type Description Default
input_channels int

Number of input spectral channels (e.g., 61 for hyperspectral cube)

required
output_channels int

Number of output channels (e.g., 3 for RGB compatibility)

required
leaky_relu_negative_slope float

Negative slope for Leaky ReLU activation (default: 0.01, as per DRCNN paper)

0.01
use_bias bool

Whether to use bias parameters (default: True, as per DRCNN paper)

True
use_activation bool

Whether to apply Leaky ReLU activation (default: True, as per DRCNN paper)

True
normalize_output bool

Whether to apply per-channel min-max normalization to [0, 1] range (default: True). This matches the behavior of band selectors and ensures compatibility with AdaClip. When True, each output channel is normalized independently using per-batch statistics.

True
init_method ('xavier', 'kaiming', 'pca', 'zeros')

Weight initialization method (default: "xavier") - "xavier": Xavier/Glorot uniform initialization - "kaiming": Kaiming/He uniform initialization - "pca": Initialize from PCA components (requires statistical_initialization) - "zeros": Zero initialization (weights and bias start at zero)

"xavier"
eps float

Small constant for numerical stability (default: 1e-6)

1e-06
reduction_scheme list[int] | None

Multi-layer reduction scheme for gradual channel reduction (default: None). If None, uses single-layer reduction (input_channels → output_channels). If provided, must start with input_channels and end with output_channels. Example: [61, 16, 8, 3] means: - Layer 1: 61 → 16 channels - Layer 2: 16 → 8 channels - Layer 3: 8 → 3 channels This matches the DRCNN paper's multi-layer architecture for better optimization.

None

Attributes:

Name Type Description
conv Conv2d

1x1 convolutional layer performing spectral mixing

activation LeakyReLU or None

Leaky ReLU activation function (if use_activation=True)

Examples:

>>> # Create mixer: 61 channels → 3 channels (single-layer)
>>> mixer = LearnableChannelMixer(
...     input_channels=61,
...     output_channels=3,
...     leaky_relu_negative_slope=0.01,
...     init_method="xavier"
... )
>>>
>>> # Create mixer with multi-layer reduction (matches DRCNN paper)
>>> mixer = LearnableChannelMixer(
...     input_channels=61,
...     output_channels=3,
...     reduction_scheme=[61, 16, 8, 3],  # Gradual reduction
...     leaky_relu_negative_slope=0.01,
...     init_method="xavier"
... )
>>>
>>> # Optional: Initialize from PCA
>>> # mixer.statistical_initialization(input_stream)
>>>
>>> # Enable gradient training
>>> mixer.unfreeze()
>>>
>>> # Forward pass: [B, H, W, 61] → [B, H, W, 3]
>>> output = mixer.forward(data=hsi_cube)
>>> rgb_like = output["rgb"]  # [B, H, W, 3]
Source code in cuvis_ai/node/channel_mixer.py
def __init__(
    self,
    input_channels: int,
    output_channels: int,
    leaky_relu_negative_slope: float = 0.01,
    use_bias: bool = True,
    use_activation: bool = True,
    normalize_output: bool = True,
    init_method: Literal["xavier", "kaiming", "pca", "zeros"] = "xavier",
    eps: float = 1e-6,
    reduction_scheme: list[int] | None = None,
    **kwargs,
) -> None:
    self.input_channels = input_channels
    self.output_channels = output_channels
    self.leaky_relu_negative_slope = leaky_relu_negative_slope
    self.use_bias = use_bias
    self.use_activation = use_activation
    self.normalize_output = normalize_output
    self.init_method = init_method
    self.eps = eps

    # Determine reduction scheme: if None, use single-layer (backward compatible)
    # If provided, use multi-layer gradual reduction (e.g., [61, 16, 8, 3])
    if reduction_scheme is None:
        reduction_scheme = [input_channels, output_channels]
    else:
        # Validate reduction scheme
        if reduction_scheme[0] != input_channels:
            raise ValueError(
                f"First element of reduction_scheme must match input_channels: "
                f"got {reduction_scheme[0]}, expected {input_channels}"
            )
        if reduction_scheme[-1] != output_channels:
            raise ValueError(
                f"Last element of reduction_scheme must match output_channels: "
                f"got {reduction_scheme[-1]}, expected {output_channels}"
            )
        if len(reduction_scheme) < 2:
            raise ValueError(
                f"reduction_scheme must have at least 2 elements, got {len(reduction_scheme)}"
            )

    self.reduction_scheme = reduction_scheme
    self.num_layers = len(reduction_scheme) - 1  # Number of reduction layers

    super().__init__(
        input_channels=input_channels,
        output_channels=output_channels,
        leaky_relu_negative_slope=leaky_relu_negative_slope,
        use_bias=use_bias,
        use_activation=use_activation,
        normalize_output=normalize_output,
        init_method=init_method,
        eps=eps,
        reduction_scheme=reduction_scheme,
        **kwargs,
    )

    # Create multi-layer reduction architecture (as per DRCNN paper)
    # Each layer performs: C_in → C_out reduction via 1x1 convolution
    self.convs = nn.ModuleList()
    for i in range(self.num_layers):
        in_ch = reduction_scheme[i]
        out_ch = reduction_scheme[i + 1]
        conv = nn.Conv2d(
            in_channels=in_ch,
            out_channels=out_ch,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=use_bias,
        )
        self.convs.append(conv)

    # Leaky ReLU activation (as per DRCNN paper)
    # Note: Leaky ReLU with a=0.01 can be very aggressive, killing most negative values
    # Consider using a higher value (e.g., 0.1) or removing activation if issues occur
    if use_activation:
        self.activation = nn.LeakyReLU(negative_slope=leaky_relu_negative_slope)
    else:
        self.activation = None

    # Initialize weights based on method
    self._initialize_weights()

    # Track initialization state
    self._statistically_initialized = False
requires_initial_fit property
requires_initial_fit

Whether this node requires statistical initialization.

statistical_initialization
statistical_initialization(input_stream)

Initialize mixer weights from PCA components.

This method computes PCA on the input data and initializes the mixer weights to the top principal components. This provides a good starting point for gradient-based optimization.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is [B, H, W, C_in]

required
Notes

This method is only used when init_method="pca". For other initialization methods, weights are set in init.

Source code in cuvis_ai/node/channel_mixer.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize mixer weights from PCA components.

    This method computes PCA on the input data and initializes the mixer weights
    to the top principal components. This provides a good starting point for
    gradient-based optimization.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is [B, H, W, C_in]

    Notes
    -----
    This method is only used when init_method="pca". For other initialization
    methods, weights are set in __init__.
    """
    if self.init_method != "pca":
        return  # No statistical initialization needed

    # Collect all data for PCA
    all_data = []
    for batch_data in input_stream:
        x = batch_data["data"]
        if x is not None:
            # Flatten spatial dimensions: [B, H, W, C] -> [B*H*W, C]
            flat = x.reshape(-1, x.shape[-1])
            all_data.append(flat)

    if not all_data:
        raise ValueError("No data provided for PCA initialization")

    # Concatenate all samples
    X = torch.cat(all_data, dim=0)  # [N, C_in]

    # Compute mean and center data
    mean = X.mean(dim=0, keepdim=True)  # [1, C_in]
    X_centered = X - mean  # [N, C_in]

    # Compute SVD to get principal components
    # X_centered = U @ S @ V.T where V contains principal components
    U, S, Vt = torch.linalg.svd(X_centered, full_matrices=False)

    # For multi-layer, initialize only the first layer with PCA
    # Subsequent layers use xavier initialization (already done in _initialize_weights)
    first_layer_out_channels = self.reduction_scheme[1]

    # Extract top first_layer_out_channels components
    # Vt is [min(N, C_in), C_in], we want first first_layer_out_channels rows
    n_components = min(first_layer_out_channels, Vt.shape[0])
    components = Vt[:n_components, :].clone()  # [n_components, C_in]

    # If we need more output channels than components, pad with zeros
    if n_components < first_layer_out_channels:
        padding = torch.zeros(
            first_layer_out_channels - n_components,
            self.input_channels,
            device=components.device,
            dtype=components.dtype,
        )
        components = torch.cat([components, padding], dim=0)

    # Set weights for first layer: conv weight shape is [C_out, C_in, 1, 1]
    # We need to transpose components: [C_out, C_in]
    with torch.no_grad():
        self.convs[0].weight.data = components.view(
            first_layer_out_channels, self.input_channels, 1, 1
        )

    self._statistically_initialized = True
unfreeze
unfreeze()

Enable gradient-based training of mixer weights.

Call this method to allow gradient updates during training. The mixer weights and biases will be optimized via backpropagation.

Example

mixer = LearnableChannelMixer(input_channels=61, output_channels=3) mixer.unfreeze() # Enable gradient training

Now mixer weights can be optimized with gradient descent
Source code in cuvis_ai/node/channel_mixer.py
def unfreeze(self) -> None:
    """Enable gradient-based training of mixer weights.

    Call this method to allow gradient updates during training. The mixer
    weights and biases will be optimized via backpropagation.

    Example
    -------
    >>> mixer = LearnableChannelMixer(input_channels=61, output_channels=3)
    >>> mixer.unfreeze()  # Enable gradient training
    >>> # Now mixer weights can be optimized with gradient descent
    """
    # Ensure parameters require gradients for all layers
    for conv in self.convs:
        for param in conv.parameters():
            param.requires_grad = True
    # Call parent to enable requires_grad on the module
    super().unfreeze()
forward
forward(data, context=None, **_)

Apply learnable channel mixing to input.

Parameters:

Name Type Description Default
data Tensor

Input tensor [B, H, W, C_in] in BHWC format

required
context Context

Execution context with epoch, batch_idx, stage info

None

Returns:

Type Description
dict[str, Tensor]

Dictionary with "rgb" key containing reduced channels [B, H, W, C_out]

Source code in cuvis_ai/node/channel_mixer.py
def forward(self, data: Tensor, context: Context | None = None, **_: Any) -> dict[str, Tensor]:
    """Apply learnable channel mixing to input.

    Parameters
    ----------
    data : Tensor
        Input tensor [B, H, W, C_in] in BHWC format
    context : Context, optional
        Execution context with epoch, batch_idx, stage info

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "rgb" key containing reduced channels [B, H, W, C_out]
    """
    B, H, W, C_in = data.shape

    # DEBUG: Print input info
    if hasattr(self, "_debug") and self._debug:
        print(
            f"[LearnableChannelMixer] Input: shape={data.shape}, "
            f"min={data.min().item():.4f}, max={data.max().item():.4f}, "
            f"mean={data.mean().item():.4f}, requires_grad={data.requires_grad}"
        )

    # Validate input channels
    if C_in != self.input_channels:
        raise ValueError(
            f"Expected {self.input_channels} input channels, got {C_in}. "
            f"Input shape: {data.shape}"
        )

    # DEBUG disabled: previously saved input tensor here (_save_debug_tensor).
    # for b in range(B):
    #     self._save_debug_tensor(data[b], "input", context, frame_idx=b)

    # Convert from BHWC to BCHW for Conv2d
    data_bchw = data.permute(0, 3, 1, 2)  # [B, C_in, H, W]

    # Apply multi-layer reduction (as per DRCNN paper)
    # Each layer: 1x1 conv → Leaky ReLU (if enabled)
    mixed = data_bchw
    for i, conv in enumerate(self.convs):
        # Apply 1x1 convolution (spectral mixing)
        mixed = conv(mixed)  # [B, C_out_i, H, W]

        # Apply Leaky ReLU activation if enabled (except after last layer if we normalize)
        # For multi-layer, we apply activation after each layer except the last
        # The last layer's output will be normalized, so we skip activation there if normalize_output=True
        if self.activation is not None:
            if i < len(self.convs) - 1 or not self.normalize_output:
                mixed = self.activation(mixed)

    # Convert back from BCHW to BHWC
    mixed_bhwc = mixed.permute(0, 2, 3, 1)  # [B, H, W, C_out]

    # DEBUG disabled: previously saved output_before_norm tensor here (_save_debug_tensor).
    # for b in range(B):
    #     self._save_debug_tensor(mixed_bhwc[b], "output_before_norm", context, frame_idx=b)

    # Apply per-channel normalization to [0, 1] range (matching band selector behavior)
    # This ensures compatibility with AdaClip preprocessing
    if self.normalize_output:
        # Per-image, per-channel min/max normalization to [0, 1]
        # This ensures each image is normalized independently for visual consistency
        # Shape: [B, H, W, C_out]
        B_norm, H_norm, W_norm, C_norm = mixed_bhwc.shape
        # Reshape to [B, H*W, C] for easier per-image processing
        mixed_flat = mixed_bhwc.view(B_norm, H_norm * W_norm, C_norm)
        # Compute min/max per image, per channel: [B, 1, C]
        rgb_min = mixed_flat.amin(dim=1, keepdim=True)  # [B, 1, C]
        rgb_max = mixed_flat.amax(dim=1, keepdim=True)  # [B, 1, C]
        denom = (rgb_max - rgb_min).clamp_min(self.eps)
        # Normalize: [B, H*W, C]
        mixed_normalized = (mixed_flat - rgb_min) / denom
        # Reshape back and clamp
        mixed_bhwc = mixed_normalized.view(B_norm, H_norm, W_norm, C_norm).clamp_(0.0, 1.0)

    # DEBUG disabled: previously saved output_after_norm tensor here (_save_debug_tensor).
    # for b in range(B):
    #     self._save_debug_tensor(mixed_bhwc[b], "output_after_norm", context, frame_idx=b)

    # DEBUG: Print output info
    if hasattr(self, "_debug") and self._debug:
        print(
            f"[LearnableChannelMixer] Output: shape={mixed_bhwc.shape}, "
            f"min={mixed_bhwc.min().item():.4f}, max={mixed_bhwc.max().item():.4f}, "
            f"mean={mixed_bhwc.mean().item():.4f}, requires_grad={mixed_bhwc.requires_grad}"
        )

    return {"rgb": mixed_bhwc}

Concrete Selector

concrete_selector

Concrete/Gumbel-Softmax band selector node for hyperspectral data.

This module implements a learnable band selector using the Concrete / Gumbel-Softmax relaxation, suitable for end-to-end training with AdaClip.

The selector learns K categorical distributions over T input bands, and during training uses the Gumbel-Softmax trick to produce differentiable approximate one-hot selection weights that become increasingly peaked as the temperature :math:\tau is annealed.

For each output channel :math:c \in {1, \dots, K}, we learn logits L_c in R^T and sample:

.. math::

w_c = \text{softmax}\left( \frac{L_c + g}{\tau} \right), \quad
g \sim \text{Gumbel}(0, 1)

The resulting weights are used to form K-channel RGB-like images:

.. math::

Y[:, :, c] = \sum_{t=1}^T w_c[t] \cdot X[:, :, t]

where X is the input hyperspectral cube in [0, 1].

ConcreteBandSelector
ConcreteBandSelector(
    input_channels,
    output_channels=3,
    tau_start=10.0,
    tau_end=0.1,
    max_epochs=20,
    use_hard_inference=True,
    eps=1e-06,
    **kwargs,
)

Bases: Node

Concrete/Gumbel-Softmax band selector for hyperspectral cubes.

Parameters:

Name Type Description Default
input_channels int

Number of input spectral channels (e.g., 61 for hyperspectral cube).

required
output_channels int

Number of output channels (default: 3 for RGB/AdaClip compatibility).

3
tau_start float

Initial temperature for Gumbel-Softmax (default: 10.0).

10.0
tau_end float

Final temperature for Gumbel-Softmax (default: 0.1).

0.1
max_epochs int

Number of epochs over which to exponentially anneal :math:\tau from tau_start to tau_end (default: 20).

20
use_hard_inference bool

If True, uses hard argmax selection at inference/validation time (one-hot weights). If False, uses softmax over logits (default: True).

True
eps float

Small constant for numerical stability (default: 1e-6).

1e-06
Notes
  • During training (context.stage == 'train'), the node samples Gumbel noise and uses the Concrete relaxation with the current temperature :math:\tau(\text{epoch}).
  • During validation/test/inference, it uses deterministic weights without Gumbel noise.
  • The node exposes selection_weights so that repulsion penalties (e.g., DistinctnessLoss) can be attached in the pipeline.
Source code in cuvis_ai/node/concrete_selector.py
def __init__(
    self,
    input_channels: int,
    output_channels: int = 3,
    tau_start: float = 10.0,
    tau_end: float = 0.1,
    max_epochs: int = 20,
    use_hard_inference: bool = True,
    eps: float = 1e-6,
    **kwargs: Any,
) -> None:
    self.input_channels = int(input_channels)
    self.output_channels = int(output_channels)
    self.tau_start = float(tau_start)
    self.tau_end = float(tau_end)
    self.max_epochs = int(max_epochs)
    self.use_hard_inference = bool(use_hard_inference)
    self.eps = float(eps)

    if self.output_channels <= 0:
        raise ValueError(f"output_channels must be positive, got {output_channels}")
    if self.input_channels <= 0:
        raise ValueError(f"input_channels must be positive, got {input_channels}")
    if self.tau_start <= 0.0 or self.tau_end <= 0.0:
        raise ValueError("tau_start and tau_end must be positive.")

    super().__init__(
        input_channels=self.input_channels,
        output_channels=self.output_channels,
        tau_start=self.tau_start,
        tau_end=self.tau_end,
        max_epochs=self.max_epochs,
        use_hard_inference=self.use_hard_inference,
        eps=self.eps,
        **kwargs,
    )

    # Learnable logits for Categorical over input channels: [C_out, C_in]
    self.logits = nn.Parameter(torch.zeros(self.output_channels, self.input_channels))
get_selection_weights
get_selection_weights(deterministic=True)

Return current selection weights without data dependency.

Parameters:

Name Type Description Default
deterministic bool

If True, uses softmax over logits (no Gumbel noise) at a "midpoint" temperature (geometric mean of start/end). If False, uses current logits with tau_end.

True
Source code in cuvis_ai/node/concrete_selector.py
def get_selection_weights(self, deterministic: bool = True) -> Tensor:
    """Return current selection weights without data dependency.

    Parameters
    ----------
    deterministic : bool, optional
        If True, uses softmax over logits (no Gumbel noise) at a
        "midpoint" temperature (geometric mean of start/end). If False,
        uses current logits with ``tau_end``.
    """
    if deterministic:
        tau = math.sqrt(self.tau_start * self.tau_end)
    else:
        tau = self.tau_end

    return F.softmax(self.logits / tau, dim=-1)
get_selected_bands
get_selected_bands()

Return argmax band indices per output channel.

Source code in cuvis_ai/node/concrete_selector.py
def get_selected_bands(self) -> Tensor:
    """Return argmax band indices per output channel."""
    with torch.no_grad():
        return torch.argmax(self.logits, dim=-1)
forward
forward(data, context=None, **_)

Apply Concrete/Gumbel-Softmax band selection.

Parameters:

Name Type Description Default
data Tensor

Input tensor [B, H, W, C_in] in BHWC format.

required
context Context

Execution context with stage and epoch information.

None

Returns:

Type Description
dict[str, Tensor]

Dictionary with: - "rgb": [B, H, W, C_out] RGB-like image. - "selection_weights": [C_out, C_in] current weights.

Source code in cuvis_ai/node/concrete_selector.py
def forward(
    self,
    data: Tensor,
    context: Context | None = None,
    **_: Any,
) -> dict[str, Tensor]:
    """Apply Concrete/Gumbel-Softmax band selection.

    Parameters
    ----------
    data : Tensor
        Input tensor [B, H, W, C_in] in BHWC format.
    context : Context, optional
        Execution context with stage and epoch information.

    Returns
    -------
    dict[str, Tensor]
        Dictionary with:
        - ``"rgb"``: [B, H, W, C_out] RGB-like image.
        - ``"selection_weights"``: [C_out, C_in] current weights.
    """
    B, H, W, C_in = data.shape

    tau = self._current_tau(context)
    device = data.device

    if self.training and context is not None and context.stage == ExecutionStage.TRAIN:
        # Gumbel-Softmax sampling during training
        g = _sample_gumbel(self.logits.shape, device=device, eps=self.eps)
        weights = F.softmax((self.logits + g) / tau, dim=-1)  # [C_out, C_in]
    else:
        # Deterministic selection for val/test/inference
        if self.use_hard_inference:
            # Hard argmax → one-hot
            indices = torch.argmax(self.logits, dim=-1)  # [C_out]
            weights = torch.zeros_like(self.logits)
            weights.scatter_(1, indices.unsqueeze(-1), 1.0)
        else:
            # Softmax over logits at low temperature
            weights = F.softmax(self.logits / self.tau_end, dim=-1)

    # Weighted sum over spectral dimension: [B, H, W, C_in] x [C_out, C_in] -> [B, H, W, C_out]
    rgb = torch.einsum("bhwc,kc->bhwk", data, weights)

    return {
        "rgb": rgb,
        "selection_weights": weights,
    }

Channel Selector

selector

Soft channel selector node for learnable channel selection.

SoftChannelSelector
SoftChannelSelector(
    n_select,
    input_channels,
    init_method="uniform",
    temperature_init=5.0,
    temperature_min=0.1,
    temperature_decay=0.9,
    hard=False,
    eps=1e-06,
    **kwargs,
)

Bases: Node

Soft channel selector with temperature-based Gumbel-Softmax selection.

This node learns to select a subset of input channels using differentiable channel selection with temperature annealing. Supports: - Statistical initialization (uniform or importance-based) - Gradient-based optimization with temperature scheduling - Entropy and diversity regularization - Hard selection at inference time

Parameters:

Name Type Description Default
n_select int

Number of channels to select

required
input_channels int

Number of input channels

required
init_method ('uniform', 'variance')

Initialization method for channel weights (default: "uniform")

"uniform"
temperature_init float

Initial temperature for Gumbel-Softmax (default: 5.0)

5.0
temperature_min float

Minimum temperature (default: 0.1)

0.1
temperature_decay float

Temperature decay factor per epoch (default: 0.9)

0.9
hard bool

If True, use hard selection at inference (default: False)

False
eps float

Small constant for numerical stability (default: 1e-6)

1e-06

Attributes:

Name Type Description
channel_logits Parameter or Tensor

Unnormalized channel importance scores [n_channels]

temperature float

Current temperature for Gumbel-Softmax

Source code in cuvis_ai/node/selector.py
def __init__(
    self,
    n_select: int,
    input_channels: int,
    init_method: Literal["uniform", "variance"] = "uniform",
    temperature_init: float = 5.0,
    temperature_min: float = 0.1,
    temperature_decay: float = 0.9,
    hard: bool = False,
    eps: float = 1e-6,
    **kwargs,
) -> None:
    self.n_select = n_select
    self.input_channels = input_channels
    self.init_method = init_method
    self.temperature_init = temperature_init
    self.temperature_min = temperature_min
    self.temperature_decay = temperature_decay
    self.hard = hard
    self.eps = eps

    super().__init__(
        n_select=n_select,
        input_channels=input_channels,
        init_method=init_method,
        temperature_init=temperature_init,
        temperature_min=temperature_min,
        temperature_decay=temperature_decay,
        hard=hard,
        eps=eps,
        **kwargs,
    )

    # Temperature tracking (not a parameter, managed externally)
    self.temperature = temperature_init
    self._n_channels = input_channels

    # Validate selection size
    if self.n_select > self._n_channels:
        raise ValueError(
            f"Cannot select {self.n_select} channels from {self._n_channels} available channels"  # nosec B608
        )

    # Initialize channel logits based on method - always as buffer
    if self.init_method == "uniform":
        # Uniform initialization
        logits = torch.zeros(self._n_channels)
    elif self.init_method == "variance":
        # Random initialization - will be refined with fit if called
        logits = torch.randn(self._n_channels) * 0.01
    else:
        raise ValueError(f"Unknown init_method: {self.init_method}")

    # Store as buffer initially
    self.register_buffer("channel_logits", logits)

    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Initialize channel selection weights from data.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is BHWC

required
Source code in cuvis_ai/node/selector.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize channel selection weights from data.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is BHWC
    """
    # Collect statistics from first batch to determine n_channels
    first_batch = next(iter(input_stream))
    x = first_batch["data"]

    if x is None:
        raise ValueError("No data provided for selector initialization")

    self._n_channels = x.shape[-1]

    if self.n_select > self._n_channels:
        raise ValueError(
            f"Cannot select {self.n_select} channels from {self._n_channels} available channels"  # nosec B608
        )

    # Initialize channel logits based on method
    if self.init_method == "uniform":
        # Uniform initialization
        logits = torch.zeros(self._n_channels)
    elif self.init_method == "variance":
        # Importance-based initialization using channel variance
        all_data = []
        all_data.append(x)

        # Collect more data for better statistics
        for batch_data in input_stream:
            x_batch = batch_data["data"]
            if x_batch is not None:
                all_data.append(x_batch)

        # Concatenate and compute variance per channel
        X = torch.cat(all_data, dim=0)  # [B, H, W, C]
        X_flat = X.reshape(-1, X.shape[-1])  # [B*H*W, C]

        # Compute variance for each channel
        variance = X_flat.var(dim=0)  # [C]

        # Use log variance as initial logits (high variance = high importance)
        logits = torch.log(variance + self.eps)
    else:
        raise ValueError(f"Unknown init_method: {self.init_method}")

    # Store as buffer
    self.channel_logits.data[:] = logits.clone()
    self._statistically_initialized = True
unfreeze
unfreeze()

Convert channel logits buffer to trainable nn.Parameter.

Call this method to enable gradient-based optimization of channel selection weights. The logits will be converted from a buffer to an nn.Parameter, allowing gradient updates during training.

Example

selector = SoftChannelSelector(n_select=10, input_channels=100) selector.unfreeze() # Enable gradient training

Now channel selection weights can be optimized
Source code in cuvis_ai/node/selector.py
def unfreeze(self) -> None:
    """Convert channel logits buffer to trainable nn.Parameter.

    Call this method to enable gradient-based optimization of channel
    selection weights. The logits will be converted from a buffer to an
    nn.Parameter, allowing gradient updates during training.

    Example
    -------
    >>> selector = SoftChannelSelector(n_select=10, input_channels=100)
    >>> selector.unfreeze()  # Enable gradient training
    >>> # Now channel selection weights can be optimized
    """
    if self.channel_logits is not None:
        # Convert buffer to parameter
        self.channel_logits = nn.Parameter(self.channel_logits.clone())
    # Call parent to enable requires_grad
    super().unfreeze()
update_temperature
update_temperature(epoch=None, step=None)

Update temperature with decay schedule.

Parameters:

Name Type Description Default
epoch int

Current epoch number (used for per-epoch decay)

None
step int

Current training step (for more granular control)

None
Source code in cuvis_ai/node/selector.py
def update_temperature(self, epoch: int | None = None, step: int | None = None) -> None:
    """Update temperature with decay schedule.

    Parameters
    ----------
    epoch : int, optional
        Current epoch number (used for per-epoch decay)
    step : int, optional
        Current training step (for more granular control)
    """
    if epoch is not None:
        # Exponential decay per epoch
        self.temperature = max(
            self.temperature_min, self.temperature_init * (self.temperature_decay**epoch)
        )
get_selection_weights
get_selection_weights(hard=None)

Get current channel selection weights.

Parameters:

Name Type Description Default
hard bool

If True, use hard selection (top-k). If None, uses self.hard.

None

Returns:

Type Description
Tensor

Selection weights [n_channels] summing to n_select

Source code in cuvis_ai/node/selector.py
def get_selection_weights(self, hard: bool | None = None) -> Tensor:
    """Get current channel selection weights.

    Parameters
    ----------
    hard : bool, optional
        If True, use hard selection (top-k). If None, uses self.hard.

    Returns
    -------
    Tensor
        Selection weights [n_channels] summing to n_select
    """
    if hard is None:
        hard = self.hard and not self.training

    if hard:
        # Hard selection: top-k channels
        _, top_indices = torch.topk(self.channel_logits, self.n_select)
        weights = torch.zeros_like(self.channel_logits)
        weights[top_indices] = 1.0
    else:
        # Soft selection with Gumbel-Softmax
        # First, compute selection probabilities
        probs = F.softmax(self.channel_logits / self.temperature, dim=-1)

        # Scale to sum to n_select instead of 1
        weights = probs * self.n_select

    return weights
forward
forward(data, **_)

Apply soft channel selection to input.

Parameters:

Name Type Description Default
data Tensor

Input tensor [B, H, W, C]

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "selected" key containing reweighted channels and optional "weights" key containing selection weights

Source code in cuvis_ai/node/selector.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply soft channel selection to input.

    Parameters
    ----------
    data : Tensor
        Input tensor [B, H, W, C]

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "selected" key containing reweighted channels
        and optional "weights" key containing selection weights
    """
    # Get selection weights
    weights = self.get_selection_weights()

    # Ensure weights are on same device
    # weights = weights.to(data.device) # no need

    # Apply channel-wise weighting: [B, H, W, C] * [C]
    selected = data * weights.view(1, 1, 1, -1)

    # Prepare output dictionary - weights always exposed for loss/metric nodes
    outputs = {"selected": selected, "weights": weights}

    return outputs
TopKIndices
TopKIndices(k, **kwargs)

Bases: Node

Utility node that surfaces the top-k channel indices from selector weights.

This node extracts the indices of the top-k weighted channels from a selector's weight vector. Useful for introspection and reporting which channels were selected.

Parameters:

Name Type Description Default
k int

Number of top indices to return

required

Attributes:

Name Type Description
k int

Number of top indices to return

Source code in cuvis_ai/node/selector.py
def __init__(self, k: int, **kwargs: Any) -> None:
    self.k = int(k)

    # Extract Node base parameters from kwargs to avoid duplication
    name = kwargs.pop("name", None)
    execution_stages = kwargs.pop("execution_stages", None)

    super().__init__(
        name=name,
        execution_stages=execution_stages,
        k=self.k,
        **kwargs,
    )
forward
forward(weights, **_)

Return the indices of the top-k weighted channels.

Parameters:

Name Type Description Default
weights Tensor

Channel selection weights [n_channels]

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "indices" key containing top-k indices

Source code in cuvis_ai/node/selector.py
def forward(self, weights: torch.Tensor, **_: Any) -> dict[str, torch.Tensor]:
    """Return the indices of the top-k weighted channels.

    Parameters
    ----------
    weights : torch.Tensor
        Channel selection weights [n_channels]

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "indices" key containing top-k indices
    """
    top_k = min(self.k, weights.shape[-1]) if weights.numel() else 0
    if top_k == 0:
        return {"indices": torch.zeros(0, dtype=torch.int64, device=weights.device)}

    _, indices = torch.topk(weights, top_k)
    return {"indices": indices}

Deep Learning Nodes

Nodes implementing deep learning components.

AdaCLIP

adaclip

AdaCLIP Anomaly Detection Nodes.

This module provides nodes for zero-shot anomaly detection using the AdaCLIP (Adaptive CLIP) model. Two implementations are available:

  • AdaCLIPLocalNode: Loads and runs the CLIP vision model locally for inference
  • AdaCLIPAPINode: Calls the AdaCLIP HuggingFace Space API for inference

AdaCLIP uses CLIP's vision features to detect anomalies based on text prompts, enabling zero-shot anomaly detection without training data.

See Also

cuvis_ai_core.node.huggingface : Base classes for HuggingFace model nodes

AdaCLIPLocalNode
AdaCLIPLocalNode(
    model_name="AdaCLIP",
    cache_dir=None,
    text_prompt="normal: lentils, anomaly: stones",
    revision=None,
    **kwargs,
)

Bases: HuggingFaceLocalNode

AdaCLIP anomaly detection with local HF loading.

Source code in cuvis_ai/node/adaclip.py
def __init__(
    self,
    model_name: str = "AdaCLIP",
    cache_dir: str | None = None,
    text_prompt: str = "normal: lentils, anomaly: stones",
    revision: str | None = None,
    **kwargs,
) -> None:
    self.text_prompt = text_prompt
    self.revision = revision

    super().__init__(
        model_name=model_name,
        cache_dir=cache_dir,
        text_prompt=text_prompt,
        **kwargs,
    )
forward
forward(image, text_prompt=None, context=None, **kwargs)

Run AdaCLIP anomaly detection with local CLIP model.

Processes images through CLIP vision encoder and generates anomaly scores based on feature norms. Supports gradient passthrough for training pipelines.

Parameters:

Name Type Description Default
image Tensor

RGB image [B, H, W, 3] in range [0, 1] or [0, 255].

required
text_prompt str

Text description for anomaly detection. If None, uses self.text_prompt. Note: Current implementation uses feature norms; text prompts will be integrated in future versions.

None
context Any

Pipeline execution context (unused, for compatibility).

None
**kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary containing: - "anomaly_mask" : Tensor [B, 1, 1, 1] - Binary anomaly predictions - "anomaly_scores" : Tensor [B, 1, 1, 1] - Normalized anomaly scores [0, 1]

Raises:

Type Description
RuntimeError

If CLIP inference fails or model is not properly loaded.

Source code in cuvis_ai/node/adaclip.py
def forward(
    self,
    image: Tensor,
    text_prompt: str | None = None,
    context: Any | None = None,  # context captured for pipeline compatibility
    **kwargs: Any,
) -> dict[str, Tensor]:
    """Run AdaCLIP anomaly detection with local CLIP model.

    Processes images through CLIP vision encoder and generates anomaly
    scores based on feature norms. Supports gradient passthrough for
    training pipelines.

    Parameters
    ----------
    image : Tensor
        RGB image [B, H, W, 3] in range [0, 1] or [0, 255].
    text_prompt : str, optional
        Text description for anomaly detection. If None, uses self.text_prompt.
        Note: Current implementation uses feature norms; text prompts will be
        integrated in future versions.
    context : Any, optional
        Pipeline execution context (unused, for compatibility).
    **kwargs : Any
        Additional keyword arguments (unused).

    Returns
    -------
    dict[str, Tensor]
        Dictionary containing:
        - "anomaly_mask" : Tensor [B, 1, 1, 1] - Binary anomaly predictions
        - "anomaly_scores" : Tensor [B, 1, 1, 1] - Normalized anomaly scores [0, 1]

    Raises
    ------
    RuntimeError
        If CLIP inference fails or model is not properly loaded.
    """
    # Use instance variable if text_prompt not provided
    if text_prompt is None:
        text_prompt = self.text_prompt

    image_processed = self._preprocess_image(image)

    # Ensure model is on the same device as input
    # Get model's current device
    model_device = next(self.model.parameters()).device
    if image_processed.device != model_device:
        # Move model to input device (more efficient than moving data back)
        self.model.to(image_processed.device)

    try:
        # CLIP models expect pixel_values as keyword argument
        # Keep gradients enabled for gradient passthrough
        outputs = self.model(pixel_values=image_processed)

        # CLIPVisionModel returns BaseModelOutputWithPooling with:
        # - pooler_output: [B, 768] global image features
        # - last_hidden_state: [B, 50, 768] patch-level features
        scores = outputs.pooler_output  # [B, 768]

        # Reshape global features to spatial format [B, 1, 1, 1]
        # Use feature norm as anomaly score
        batch_size = scores.shape[0]
        scores = scores.norm(dim=-1, keepdim=True)  # [B, 1]
        scores = scores.view(batch_size, 1, 1, 1)  # [B, 1, 1, 1]

        # Normalize scores to [0, 1] range for interpretability
        # Use min-max normalization to preserve gradients better
        scores_min = scores.min()
        scores_max = scores.max()
        if scores_max > scores_min:
            scores = (scores - scores_min) / (scores_max - scores_min + 1e-8)
        else:
            # All scores are the same - normalize to 0.5
            scores = torch.ones_like(scores) * 0.5

        # Create binary anomaly mask (threshold at 0.5)
        anomaly_mask = scores > 0.5

        return {
            "anomaly_mask": anomaly_mask,
            "anomaly_scores": scores,
        }
    except Exception as exc:  # pragma: no cover - defensive path
        logger.error(f"Local AdaCLIP inference failed: {exc}")
        raise RuntimeError(
            f"AdaCLIP local inference failed: {exc}\n"
            f"Model: {self.model_name}\n"
            f"Input shape: {image.shape}\n"
            f"Text prompt: {text_prompt}"
        ) from exc
AdaCLIPAPINode
AdaCLIPAPINode(
    space_url="Caoyunkang/AdaCLIP",
    dataset_option="All",
    text_prompt="normal: lentils, anomaly: stones",
    **kwargs,
)

Bases: HuggingFaceAPINode

AdaCLIP anomaly detection via HuggingFace Spaces API.

This node calls the AdaCLIP Space for zero-shot anomaly detection. API backend is non-differentiable and suitable for inference only.

Parameters:

Name Type Description Default
space_url str

AdaCLIP Space URL (default: "Caoyunkang/AdaCLIP")

'Caoyunkang/AdaCLIP'
dataset_option str

Dataset selection option (default: "All")

'All'
text_prompt str

Text prompt for anomaly detection (default: "normal: lentils, anomaly: stones")

'normal: lentils, anomaly: stones'
**kwargs

Additional arguments passed to HuggingFaceAPINode

{}

Examples:

>>> # Create node
>>> adaclip = AdaCLIPAPINode()
>>>
>>> # Run inference
>>> rgb_image = torch.rand(1, 224, 224, 3)  # BHWC format
>>> result = adaclip.forward(image=rgb_image)
>>> anomaly_mask = result["anomaly_mask"]  # [B, H, W, 1]
Source code in cuvis_ai/node/adaclip.py
def __init__(
    self,
    space_url: str = "Caoyunkang/AdaCLIP",
    dataset_option: str = "All",
    text_prompt: str = "normal: lentils, anomaly: stones",
    **kwargs,
) -> None:
    self.dataset_option = dataset_option
    self.text_prompt = text_prompt

    super().__init__(
        space_url=space_url,
        dataset_option=dataset_option,
        text_prompt=text_prompt,
        **kwargs,
    )
forward
forward(
    image, text_prompt=None, dataset_option=None, **kwargs
)

Run AdaCLIP anomaly detection via API.

Parameters:

Name Type Description Default
image Tensor

RGB image [B, H, W, 3] in BHWC format

required
text_prompt str

Text description of anomaly to detect. If None, uses self.text_prompt.

None
**kwargs Any

Additional arguments (unused)

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "anomaly_mask" and optionally "anomaly_scores"

Raises:

Type Description
RuntimeError

If API call fails

ValueError

If image format is invalid

Source code in cuvis_ai/node/adaclip.py
def forward(
    self,
    image: Tensor,
    text_prompt: str | None = None,
    dataset_option: str | None = None,
    **kwargs: Any,
) -> dict[str, Tensor]:
    """Run AdaCLIP anomaly detection via API.

    Parameters
    ----------
    image : Tensor
        RGB image [B, H, W, 3] in BHWC format
    text_prompt : str, optional
        Text description of anomaly to detect. If None, uses self.text_prompt.
    **kwargs
        Additional arguments (unused)

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "anomaly_mask" and optionally "anomaly_scores"

    Raises
    ------
    RuntimeError
        If API call fails
    ValueError
        If image format is invalid
    """

    # Use instance variable if text_prompt not provided
    if text_prompt is None:
        text_prompt = self.text_prompt

    # Process each image in batch
    batch_size = image.shape[0]
    masks = []

    for i in range(batch_size):
        img = image[i]  # [H, W, 3]

        img_np = img.detach().cpu().numpy()

        # Normalize to [0, 255] if in [0, 1]
        if img_np.max() <= 1.0:
            img_np = (img_np * 255).astype(np.uint8)
        else:
            img_np = img_np.astype(np.uint8)

        # Convert to PIL Image
        pil_img = Image.fromarray(img_np)

        try:
            # Call API
            logger.debug(f"Calling AdaCLIP API for image {i + 1}/{batch_size}")
            with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_img:
                pil_img.save(tmp_img.name)
                tmp_path = tmp_img.name

            try:
                result = self.client.predict(
                    handle_file(tmp_path),
                    text_prompt,
                    dataset_option,
                    api_name="/predict",
                )
            finally:
                try:
                    os.remove(tmp_path)
                except OSError:
                    logger.warning(f"Failed to remove temp image: {tmp_path}")

            # Parse result
            # Note: Actual return format depends on AdaCLIP Space implementation
            # The Space currently returns (output_image_path, anomaly_score_str)
            if isinstance(result, np.ndarray):
                mask_np = result
            elif isinstance(result, (list, tuple)):
                first = result[0]
                if isinstance(first, np.ndarray):
                    mask_np = first
                elif isinstance(first, str):
                    # Gradio returns a temporary file path – load and convert to array
                    from PIL import Image as PILImage

                    mask_np = np.array(PILImage.open(first))
                else:
                    raise ValueError(
                        f"Unexpected first element type in result tuple: {type(first)}"
                    )
            else:
                raise ValueError(f"Unexpected API result type: {type(result)}")

            mask = torch.from_numpy(mask_np)

            # Ensure correct shape [H, W, 1]
            if mask.dim() == 2:
                mask = mask.unsqueeze(-1)
            elif mask.dim() == 3 and mask.shape[-1] == 3:
                # Convert RGB mask to single channel
                mask = mask.float().mean(dim=-1, keepdim=True)

            # Resize to original spatial resolution if needed
            orig_h, orig_w = img.shape[0], img.shape[1]
            if mask.shape[0] != orig_h or mask.shape[1] != orig_w:
                # interpolate expects NCHW
                mask = mask.permute(2, 0, 1).unsqueeze(0).float()
                mask = torch.nn.functional.interpolate(
                    mask,
                    size=(orig_h, orig_w),
                    mode="bilinear",
                    align_corners=False,
                )
                mask = mask.squeeze(0).permute(1, 2, 0)

            # Convert to binary mask
            if mask.dtype != torch.bool:
                mask = mask > 0

            masks.append(mask)

        except Exception as e:
            logger.error(f"API call failed for image {i + 1}/{batch_size}: {e}")
            raise RuntimeError(
                f"AdaCLIP API call failed: {e}\n"
                f"Space: {self.space_url}\n"
                f"Text prompt: {text_prompt}"
            ) from e

    # Stack batch
    anomaly_mask = torch.stack(masks, dim=0)  # [B, H, W, 1]

    return {
        "anomaly_mask": anomaly_mask,
    }

Analysis & Dimensionality Reduction

Nodes for dimensionality reduction and feature extraction.

PCA

pca

Trainable PCA node for dimensionality reduction with gradient-based optimization.

TrainablePCA
TrainablePCA(
    num_channels,
    n_components,
    whiten=False,
    init_method="svd",
    eps=1e-06,
    **kwargs,
)

Bases: Node

Trainable PCA node with orthogonality regularization.

This node performs Principal Component Analysis (PCA) for dimensionality reduction and can be trained end-to-end with gradient descent. It supports: - Statistical initialization from data - Gradient-based fine-tuning with orthogonality constraints - Explained variance tracking

Parameters:

Name Type Description Default
n_components int

Number of principal components to retain

required
whiten bool

If True, scale components by explained variance (default: False)

False
init_method ('svd', 'random')

Initialization method for components (default: "svd")

"svd"
eps float

Small constant for numerical stability (default: 1e-6)

1e-06

Attributes:

Name Type Description
components Parameter or Tensor

Principal components matrix [n_components, n_features]

mean Tensor

Feature-wise mean [n_features]

explained_variance Tensor

Variance explained by each component [n_components]

Source code in cuvis_ai/node/pca.py
def __init__(
    self,
    num_channels: int,
    n_components: int,
    whiten: bool = False,
    init_method: Literal["svd", "random"] = "svd",
    eps: float = 1e-6,
    **kwargs,
) -> None:
    self.n_components = n_components
    self.whiten = whiten
    self.init_method = init_method
    self.eps = eps

    super().__init__(
        num_channels=num_channels,
        n_components=n_components,
        whiten=whiten,
        init_method=init_method,
        eps=eps,
        **kwargs,
    )

    # Buffers for statistical initialization (private to avoid conflicts with output ports)
    self.register_buffer("_mean", torch.empty(num_channels))
    self.register_buffer("_explained_variance", torch.empty(n_components))
    self.register_buffer("_components", torch.empty(n_components, num_channels))

    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Initialize PCA components from data using SVD.

Parameters:

Name Type Description Default
input_stream InputStream

Input stream yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is BHWC

required
Source code in cuvis_ai/node/pca.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize PCA components from data using SVD.

    Parameters
    ----------
    input_stream : InputStream
        Input stream yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is BHWC
    """
    # Todo: this should not concatenate all data and then do SVD - this is not scalable.
    # Iether do incremental PCA or use a subset of data.
    # Collect all data
    all_data = []
    for batch_data in input_stream:
        x = batch_data["data"]
        if x is not None:
            # Flatten spatial dimensions: [B, H, W, C] -> [B*H*W, C]
            flat = x.reshape(-1, x.shape[-1])
            all_data.append(flat)

    if not all_data:
        raise ValueError("No data provided for PCA initialization")

    # Concatenate all samples
    X = torch.cat(all_data, dim=0)  # [N, C]

    # Compute mean and center data
    self._mean = X.mean(dim=0)  # [C]
    X_centered = X - self._mean  # [N, C]

    # Compute SVD
    # X_centered = U @ S @ V.T where V contains principal components
    U, S, Vt = torch.linalg.svd(X_centered, full_matrices=False)

    # Extract top n_components and store as buffer
    self._components = Vt[: self.n_components, :].clone()  # [n_components, C]

    # Compute explained variance
    # Variance = (S^2) / (N - 1)
    variance = (S**2) / (X.shape[0] - 1)
    self._explained_variance = variance[: self.n_components].clone()  # [n_components]

    self._statistically_initialized = True
unfreeze
unfreeze()

Convert components buffer to trainable nn.Parameter.

Call this method after fit() to enable gradient-based training of the principal components. The components will be converted from a buffer to an nn.Parameter, allowing gradient updates during training.

Example

pca.statistical_initialization(input_stream) # Statistical initialization pca.unfreeze() # Enable gradient training

Now PCA components can be fine-tuned with gradient descent
Source code in cuvis_ai/node/pca.py
def unfreeze(self) -> None:
    """Convert components buffer to trainable nn.Parameter.

    Call this method after fit() to enable gradient-based training of the
    principal components. The components will be converted from a buffer
    to an nn.Parameter, allowing gradient updates during training.

    Example
    -------
    >>> pca.statistical_initialization(input_stream)  # Statistical initialization
    >>> pca.unfreeze()  # Enable gradient training
    >>> # Now PCA components can be fine-tuned with gradient descent
    """
    if self._components.numel() > 0:
        # Convert buffer to parameter
        self._components = nn.Parameter(self._components.clone())
    # Call parent to enable requires_grad
    super().unfreeze()  # could this have unintended side effects? like the graph be unfrozen?
forward
forward(data, **_)

Project data onto principal components.

Parameters:

Name Type Description Default
data Tensor

Input tensor [B, H, W, C]

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "projected" key containing PCA-projected data

Source code in cuvis_ai/node/pca.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Project data onto principal components.

    Parameters
    ----------
    data : Tensor
        Input tensor [B, H, W, C]

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "projected" key containing PCA-projected data
    """
    if not self._statistically_initialized:
        raise RuntimeError("PCA not initialized. Call statistical_initialization() first.")

    B, H, W, C = data.shape

    # Flatten spatial dimensions
    x_flat = data.reshape(-1, C)  # [B*H*W, C]

    # Ensure mean is on the same device as input
    mean = self._mean.to(data.device)

    # Center data
    x_centered = x_flat - mean  # [B*H*W, C]

    # Ensure components are on the same device as input
    components = (
        self._components.to(data.device)
        if isinstance(self._components, Tensor)
        else self._components
    )

    # Project onto components: X_proj = X @ components.T
    x_proj = x_centered @ components.T  # [B*H*W, n_components]

    # Whiten if requested
    if self.whiten:
        # Ensure explained_variance is on the same device
        explained_variance = self._explained_variance.to(data.device)
        # Scale by 1/sqrt(explained_variance)
        scale = 1.0 / torch.sqrt(explained_variance + self.eps)
        x_proj = x_proj * scale

    # Reshape back to spatial dimensions
    x_proj = x_proj.reshape(B, H, W, self.n_components)

    # Prepare output dictionary
    outputs = {"projected": x_proj}

    # Add optional outputs for loss/metric nodes
    # Expose explained variance ratio
    if self._explained_variance.numel() > 0:
        total_variance = self._explained_variance.sum()
        variance_ratio = self._explained_variance / (total_variance + self.eps)
        outputs["explained_variance_ratio"] = variance_ratio.to(data.device)

    # Expose components for loss/metric nodes
    if self._components.numel() > 0:
        outputs["components"] = self._components

    return outputs

Visualization Nodes

Nodes for creating visualizations and TensorBoard logging.

Visualizations

visualizations

Visualization sink nodes for monitoring training progress (port-based architecture).

CubeRGBVisualizer
CubeRGBVisualizer(name=None, up_to=5)

Bases: Node

Creates false-color RGB images from hyperspectral cube using channel weights.

Selects 3 channels with highest weights for R, G, B channels and creates a false-color visualization with wavelength annotations.

Source code in cuvis_ai/node/visualizations.py
def __init__(self, name: str | None = None, up_to: int = 5) -> None:
    super().__init__(name=name, execution_stages={ExecutionStage.INFERENCE, ExecutionStage.VAL})
    self.up_to = up_to
forward
forward(cube, weights, wavelengths, context)

Generate false-color RGB visualizations from hyperspectral cube.

Selects the 3 channels with highest weights and creates RGB images with wavelength annotations. Also generates a bar chart showing channel weights with the selected channels highlighted.

Parameters:

Name Type Description Default
cube Tensor

Hyperspectral cube [B, H, W, C].

required
weights Tensor

Channel selection weights [C] indicating importance of each channel.

required
wavelengths Tensor

Wavelengths for each channel [C] in nanometers.

required
context Context

Execution context with stage, epoch, batch_idx information.

required

Returns:

Type Description
dict[str, list[Artifact]]

Dictionary with "artifacts" key containing list of visualization artifacts.

Source code in cuvis_ai/node/visualizations.py
def forward(self, cube, weights, wavelengths, context) -> dict[str, list[Artifact]]:
    """Generate false-color RGB visualizations from hyperspectral cube.

    Selects the 3 channels with highest weights and creates RGB images
    with wavelength annotations. Also generates a bar chart showing
    channel weights with the selected channels highlighted.

    Parameters
    ----------
    cube : Tensor
        Hyperspectral cube [B, H, W, C].
    weights : Tensor
        Channel selection weights [C] indicating importance of each channel.
    wavelengths : Tensor
        Wavelengths for each channel [C] in nanometers.
    context : Context
        Execution context with stage, epoch, batch_idx information.

    Returns
    -------
    dict[str, list[Artifact]]
        Dictionary with "artifacts" key containing list of visualization artifacts.
    """
    top3_indices = torch.topk(weights, k=3).indices.cpu().numpy()
    top3_wavelengths = wavelengths[top3_indices]

    batch_size = min(cube.shape[0], self.up_to)
    artifacts = []

    for b in range(batch_size):
        rgb_channels = cube[b, :, :, top3_indices].cpu().numpy()

        rgb_img = (rgb_channels - rgb_channels.min()) / (
            rgb_channels.max() - rgb_channels.min() + 1e-8
        )

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

        ax1.imshow(rgb_img)
        ax1.set_title(
            f"False RGB: R={top3_wavelengths[0]:.1f}nm, "
            f"G={top3_wavelengths[1]:.1f}nm, B={top3_wavelengths[2]:.1f}nm"
        )
        ax1.axis("off")

        ax2.bar(range(len(wavelengths)), weights.detach().cpu().numpy())
        ax2.scatter(
            top3_indices,
            weights[top3_indices].detach().cpu().numpy(),
            c="red",
            s=100,
            zorder=3,
        )
        ax2.set_xlabel("Channel Index")
        ax2.set_ylabel("Weight")
        ax2.set_title("Channel Selection Weights")
        ax2.grid(True, alpha=0.3)

        for idx in top3_indices:
            ax2.annotate(
                f"{wavelengths[idx]:.0f}nm",
                xy=(idx, weights[idx].item()),
                xytext=(0, 10),
                textcoords="offset points",
                ha="center",
                fontsize=8,
            )

        plt.tight_layout()

        img_array = fig_to_array(fig, dpi=150)

        artifact = Artifact(
            name=f"viz_rgb_sample_{b}",
            value=img_array,
            el_id=b,
            desc=f"False RGB visualization for sample {b}",
            type=ArtifactType.IMAGE,
        )
        artifacts.append(artifact)
        plt.close(fig)

    return {"artifacts": artifacts}
PCAVisualization
PCAVisualization(up_to=None, **kwargs)

Bases: Node

Visualize PCA-projected data with scatter and image plots.

Creates visualizations for each batch element showing: 1. Scatter plot of H*W points in 2D PC space (using first 2 PCs) 2. Image representation of the 2D projection reshaped to [H, W, 2]

Points in scatter plot are colored by spatial position. Returns artifacts for monitoring systems.

Executes only during validation stage.

Parameters:

Name Type Description Default
up_to int

Maximum number of batch elements to visualize. If None, visualizes all (default: None)

None

Examples:

>>> pca_viz = PCAVisualization(up_to=10)
>>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
>>> graph.connect(
...     (pca.projected, pca_viz.data),
...     (pca_viz.artifacts, tensorboard_node.artifacts),
... )
Source code in cuvis_ai/node/visualizations.py
def __init__(self, up_to: int | None = None, **kwargs) -> None:
    self.up_to = up_to

    super().__init__(execution_stages={ExecutionStage.VAL}, up_to=up_to, **kwargs)
forward
forward(data, context)

Create PCA projection visualizations as Artifact objects.

Parameters:

Name Type Description Default
data Tensor

PCA-projected data tensor [B, H, W, C] (uses first 2 components)

required
context Context

Execution context with stage, epoch, batch_idx

required

Returns:

Type Description
dict

Dictionary with "artifacts" key containing list of Artifact objects

Source code in cuvis_ai/node/visualizations.py
def forward(self, data: torch.Tensor, context: Context) -> dict:
    """Create PCA projection visualizations as Artifact objects.

    Parameters
    ----------
    data : torch.Tensor
        PCA-projected data tensor [B, H, W, C] (uses first 2 components)
    context : Context
        Execution context with stage, epoch, batch_idx

    Returns
    -------
    dict
        Dictionary with "artifacts" key containing list of Artifact objects
    """
    # Convert to numpy
    data_np = data.detach().cpu().numpy()

    # Handle input shape: [B, H, W, C]
    if data_np.ndim != 4:
        raise ValueError(f"Expected 4D input [B, H, W, C], got shape: {data_np.shape}")

    B, H, W, C = data_np.shape

    if C < 2:
        raise ValueError(f"Expected at least 2 components, got {C}")

    # Extract context information
    stage = context.stage.value
    epoch = context.epoch
    batch_idx = context.batch_idx

    # Determine how many images to visualize from this batch
    up_to_batch = B if self.up_to is None else min(B, self.up_to)

    # List to collect artifacts
    artifacts = []

    # Loop through each batch element
    for i in range(up_to_batch):
        # Get projection for this batch element: [H, W, C]
        projection = data_np[i]

        # Use only first 2 components
        projection_2d = projection[:, :, :2]  # [H, W, 2]

        # Flatten spatial dimensions for scatter plot
        projection_flat = projection_2d.reshape(-1, 2)  # [H*W, 2]

        # Create spatial position colors using 2D HSV encoding
        # x-coordinate maps to Hue (0-1)
        # y-coordinate maps to Saturation (0-1)
        # Value is constant at 1.0 for brightness
        y_coords, x_coords = np.meshgrid(np.arange(H), np.arange(W), indexing="ij")

        # Normalize coordinates to [0, 1]
        x_norm = x_coords / (W - 1) if W > 1 else np.zeros_like(x_coords)
        y_norm = y_coords / (H - 1) if H > 1 else np.zeros_like(y_coords)

        # Create HSV colors: H from x, S from y, V constant
        hsv_colors = np.stack(
            [
                x_norm.flatten(),  # Hue from x-coordinate
                y_norm.flatten(),  # Saturation from y-coordinate
                np.ones(H * W),  # Value constant at 1.0
            ],
            axis=-1,
        )

        # Convert HSV to RGB for matplotlib
        from matplotlib.colors import hsv_to_rgb

        rgb_colors = hsv_to_rgb(hsv_colors)

        # Create figure with 3 subplots
        fig, axes = plt.subplots(1, 3, figsize=(20, 6))

        # Subplot 1: Scatter plot colored by 2D spatial position
        axes[0].scatter(
            projection_flat[:, 0],
            projection_flat[:, 1],
            c=rgb_colors,
            alpha=0.6,
            s=20,
        )
        axes[0].set_xlabel("PC1 (1st component)")
        axes[0].set_ylabel("PC2 (2nd component)")
        axes[0].set_title(f"PCA Scatter - {stage} E{epoch} B{batch_idx} Img{i}")
        axes[0].grid(True, alpha=0.3)

        # Subplot 2: Spatial reference image
        # Create reference image showing the spatial color coding
        spatial_reference = hsv_to_rgb(
            np.stack([x_norm, y_norm, np.ones_like(x_norm)], axis=-1)
        )
        axes[1].imshow(spatial_reference, aspect="auto")
        axes[1].set_xlabel("Width (→ Hue)")
        axes[1].set_ylabel("Height (→ Saturation)")
        axes[1].set_title("Spatial Color Reference")

        # Subplot 3: Image representation
        # Normalize each channel to [0, 1] for visualization
        pc1_norm = (projection_2d[:, :, 0] - projection_2d[:, :, 0].min()) / (
            projection_2d[:, :, 0].max() - projection_2d[:, :, 0].min() + 1e-8
        )
        pc2_norm = (projection_2d[:, :, 1] - projection_2d[:, :, 1].min()) / (
            projection_2d[:, :, 1].max() - projection_2d[:, :, 1].min() + 1e-8
        )

        # Create RGB image: PC1 in red channel, PC2 in green channel, zeros in blue
        img_rgb = np.stack([pc1_norm, pc2_norm, np.zeros_like(pc1_norm)], axis=-1)

        axes[2].imshow(img_rgb, aspect="auto")
        axes[2].set_xlabel("Width")
        axes[2].set_ylabel("Height")
        axes[2].set_title("PCA Image (R=PC1, G=PC2)")

        # Add statistics text
        pc1_min = projection_2d[:, :, 0].min()
        pc1_max = projection_2d[:, :, 0].max()
        pc2_min = projection_2d[:, :, 1].min()
        pc2_max = projection_2d[:, :, 1].max()
        stats_text = (
            f"Shape: [{H}, {W}]\n"
            f"Points: {H * W}\n"
            f"PC1 range: [{pc1_min:.3f}, {pc1_max:.3f}]\n"
            f"PC2 range: [{pc2_min:.3f}, {pc2_max:.3f}]"
        )
        fig.text(
            0.98,
            0.5,
            stats_text,
            ha="left",
            va="center",
            bbox={
                "boxstyle": "round",
                "facecolor": "wheat",
                "alpha": 0.5,
            },
        )

        plt.tight_layout()

        # Convert figure to numpy array (RGB format)
        img_array = fig_to_array(fig, dpi=150)

        # Create Artifact object
        artifact = Artifact(
            name=f"pca_projection_img{i:02d}",
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
            value=img_array,
            el_id=i,
            desc=f"PCA projection for {stage} epoch {epoch}, batch {batch_idx}, image {i}",
            type=ArtifactType.IMAGE,
        )
        artifacts.append(artifact)

        progress_total = self.up_to if self.up_to else B
        description = (
            f"Created PCA projection artifact ({i + 1}/{progress_total}): {artifact.name}"
        )
        logger.info(description)

        plt.close(fig)

    # Return artifacts
    return {"artifacts": artifacts}
AnomalyMask
AnomalyMask(channel, up_to=None, **kwargs)

Bases: Node

Visualize anomaly detection with GT and predicted masks.

Creates side-by-side visualizations showing ground truth masks, predicted masks, and overlay comparisons on hyperspectral cube images. The overlay shows: - Green: True Positives (correct anomaly detection) - Red: False Positives (false alarms) - Yellow: False Negatives (missed anomalies)

Also displays IoU and other metrics. Returns a list of Artifact objects for logging to monitoring systems.

Executes during validation and inference stages.

Parameters:

Name Type Description Default
channel int

Channel index to use for cube visualization (required)

required
up_to int

Maximum number of images to visualize. If None, visualizes all (default: None)

None

Examples:

>>> decider = BinaryDecider(threshold=0.2)
>>> viz_mask = AnomalyMask(channel=30, up_to=5)
>>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
>>> graph.connect(
...     (logit_head.logits, decider.data),
...     (decider.decisions, viz_mask.decisions),
...     (data_node.mask, viz_mask.mask),
...     (data_node.cube, viz_mask.cube),
...     (viz_mask.artifacts, tensorboard_node.artifacts),
... )
Source code in cuvis_ai/node/visualizations.py
def __init__(self, channel: int, up_to: int | None = None, **kwargs) -> None:
    self.channel = channel
    self.up_to = up_to

    super().__init__(
        execution_stages={ExecutionStage.VAL, ExecutionStage.TEST, ExecutionStage.INFERENCE},
        channel=channel,
        up_to=up_to,
        **kwargs,
    )
forward
forward(decisions, cube, context, mask=None, scores=None)

Create anomaly mask visualizations with GT/pred comparison.

Parameters:

Name Type Description Default
decisions Tensor

Binary anomaly decisions [B, H, W, 1]

required
mask Tensor | None

Ground truth anomaly mask [B, H, W, 1] (optional)

None
cube Tensor

Original cube [B, H, W, C] for visualization

required
context Context

Execution context with stage, epoch, batch_idx

required

Returns:

Type Description
dict

Dictionary with "artifacts" key containing list of Artifact objects

Source code in cuvis_ai/node/visualizations.py
def forward(
    self,
    decisions: torch.Tensor,
    cube: torch.Tensor,
    context: Context,
    mask: torch.Tensor | None = None,
    scores: torch.Tensor | None = None,
) -> dict:
    """Create anomaly mask visualizations with GT/pred comparison.

    Parameters
    ----------
    decisions : torch.Tensor
        Binary anomaly decisions [B, H, W, 1]
    mask : torch.Tensor | None
        Ground truth anomaly mask [B, H, W, 1] (optional)
    cube : torch.Tensor
        Original cube [B, H, W, C] for visualization
    context : Context
        Execution context with stage, epoch, batch_idx

    Returns
    -------
    dict
        Dictionary with "artifacts" key containing list of Artifact objects
    """
    # Extract context information
    stage = context.stage.value
    epoch = context.epoch
    batch_idx = context.batch_idx

    # Use decisions directly (already binary)
    pred_mask = decisions.float()

    # Convert to numpy and squeeze channel dimension
    pred_mask_np = pred_mask.detach().cpu().numpy().squeeze(-1)  # [B, H, W]
    cube_np = cube.detach().cpu().numpy()  # [B, H, W, C]

    # Determine if we should use ground truth
    # Skip GT comparison if: mask not provided, inference stage, or mask is all zeros
    use_gt = (
        mask is not None and context.stage != ExecutionStage.INFERENCE and mask.any().item()
    )

    # Process ground truth mask if available
    gt_mask_np = None
    batch_iou = None
    if use_gt:
        gt_mask_np = mask.detach().cpu().numpy().squeeze(-1)  # [B, H, W]

        # Add binary mask assertion
        unique_values = np.unique(gt_mask_np)
        if not np.all(np.isin(unique_values, [0, 1, True, False])):
            raise ValueError(
                f"AnomalyMask expects binary masks with only values {{0, 1}}. "
                f"Found unique values: {unique_values}. "
                f"Ensure LentilsAnomolyDataNode is configured with anomaly_class_ids "
                f"to convert multi-class masks to binary."
            )

        # Compute batch-level IoU (matches AnomalyDetectionMetrics computation)
        batch_gt = gt_mask_np > 0.5  # [B, H, W] bool
        batch_pred = pred_mask_np > 0.5  # [B, H, W] bool
        batch_tp = np.logical_and(batch_pred, batch_gt).sum()
        batch_fp = np.logical_and(batch_pred, ~batch_gt).sum()
        batch_fn = np.logical_and(~batch_pred, batch_gt).sum()
        batch_iou = batch_tp / (batch_tp + batch_fp + batch_fn + 1e-8)

    # Determine how many images to visualize from this batch
    batch_size = pred_mask_np.shape[0]
    up_to_batch = batch_size if self.up_to is None else min(batch_size, self.up_to)

    # List to collect artifacts
    artifacts = []

    # Loop through each image in the batch up to the limit
    for i in range(up_to_batch):
        # Get predicted mask for this image
        pred = pred_mask_np[i] > 0.5  # [H, W] bool

        # Get cube channel for visualization
        cube_img = cube_np[i]  # [H, W, C]
        cube_channel = cube_img[:, :, self.channel]

        # Normalize cube channel to [0, 1] for display
        cube_norm = (cube_channel - cube_channel.min()) / (
            cube_channel.max() - cube_channel.min() + 1e-8
        )

        if use_gt:
            # Mode A: Full comparison with ground truth
            assert gt_mask_np is not None, "gt_mask_np should not be None when use_gt is True"
            gt = gt_mask_np[i] > 0.5  # [H, W] bool

            # Compute confusion matrix
            tp = np.logical_and(pred, gt)  # True Positives
            fp = np.logical_and(pred, ~gt)  # False Positives
            fn = np.logical_and(~pred, gt)  # False Negatives
            # Compute metrics
            tp_count = tp.sum()
            fp_count = fp.sum()
            fn_count = fn.sum()

            precision = tp_count / (tp_count + fp_count + 1e-8)
            recall = tp_count / (tp_count + fn_count + 1e-8)
            iou = tp_count / (tp_count + fp_count + fn_count + 1e-8)

            # Create figure with 3 subplots
            fig, axes = plt.subplots(1, 3, figsize=(18, 6))

            # Subplot 1: Ground truth mask
            axes[0].imshow(gt, cmap="gray", aspect="auto")
            axes[0].set_title("Ground Truth Mask")
            axes[0].set_xlabel("Width")
            axes[0].set_ylabel("Height")

            # Subplot 2: Cube with TP/FP/FN overlay
            per_image_ap = None
            if scores is not None:
                raw_scores = scores[i, ..., 0]
                probs = torch.sigmoid(raw_scores).flatten()
                target_tensor = mask[i, ..., 0].flatten().to(dtype=torch.long)
                if probs.numel() == target_tensor.numel():
                    per_image_ap = binary_average_precision(probs, target_tensor).item()

            axes[1].imshow(cube_norm, cmap="gray", aspect="auto")

            # Create color overlay
            overlay = np.zeros((*gt.shape, 4))
            overlay[tp] = [0, 1, 0, 0.6]  # Green: True Positives
            overlay[fp] = [1, 0, 0, 0.6]  # Red: False Positives
            overlay[fn] = [1, 1, 0, 0.6]  # Yellow: False Negatives
            # TN pixels remain transparent (no overlay)

            overlay_title = f"Overlay (Channel {self.channel}) - IoU: {iou:.3f}"
            if per_image_ap is not None:
                overlay_title += f" | AP: {per_image_ap:.3f}"
            overlay_title += "\nGreen=TP, Red=FP, Yellow=FN"

            axes[1].imshow(overlay, aspect="auto")
            axes[1].set_title(overlay_title)
            axes[1].set_xlabel("Width")
            axes[1].set_ylabel("Height")

            # Subplot 3: Predicted mask with metrics in title
            axes[2].imshow(pred, cmap="gray", aspect="auto")

            # Add metrics as title (smaller font)
            metrics_title = (
                f"Predicted Mask\nIoU: {iou:.4f} | Prec: {precision:.4f} | Rec: {recall:.4f}"
            )
            if per_image_ap is not None:
                metrics_title += f" | AP: {per_image_ap:.4f}"
            metrics_title += f"\nBatch IoU: {batch_iou:.4f} (all {batch_size} imgs) | Ch: {self.channel}/{cube_img.shape[2]}"
            axes[2].set_title(metrics_title, fontsize=9)
            axes[2].set_xlabel("Width")
            axes[2].set_ylabel("Height")

            log_msg = f"Created anomaly mask artifact ({i + 1}/{up_to_batch}): IoU: {iou:.3f}"
        else:
            # Mode B: Prediction-only visualization (no ground truth)
            # Create figure with 2 subplots
            fig, axes = plt.subplots(1, 2, figsize=(12, 6))

            # Subplot 1: Cube with predicted overlay
            axes[0].imshow(cube_norm, cmap="gray", aspect="auto")

            # Create prediction overlay (cyan for predicted anomalies)
            overlay = np.zeros((*pred.shape, 4))
            overlay[pred] = [0, 1, 1, 0.6]  # Cyan: Predicted anomalies

            axes[0].imshow(overlay, aspect="auto")
            axes[0].set_title(
                f"Prediction Overlay (Channel {self.channel})\nCyan=Predicted Anomalies"
            )
            axes[0].set_xlabel("Width")
            axes[0].set_ylabel("Height")

            # Subplot 2: Predicted mask
            axes[1].imshow(pred, cmap="gray", aspect="auto")
            axes[1].set_title("Predicted Mask")
            axes[1].set_xlabel("Width")
            axes[1].set_ylabel("Height")

            # Add statistics as text
            pred_pixels = pred.sum()
            total_pixels = pred.size
            pred_ratio = pred_pixels / total_pixels

            stats_text = (
                f"Prediction Stats:\n"
                f"Anomaly pixels: {pred_pixels}\n"
                f"Total pixels: {total_pixels}\n"
                f"Anomaly ratio: {pred_ratio:.4f}\n"
                f"\n"
                f"Channel: {self.channel}/{cube_img.shape[2]}\n"
                f"\n"
                f"Mode: Inference/No GT"
            )

            fig.text(
                0.98,
                0.5,
                stats_text,
                ha="left",
                va="center",
                bbox={
                    "boxstyle": "round",
                    "facecolor": "lightblue",
                    "alpha": 0.5,
                },
                fontfamily="monospace",
            )

            log_msg = (
                f"Created anomaly mask artifact ({i + 1}/{up_to_batch}): prediction-only mode"
            )

        # Add main title with epoch/batch info
        fig.suptitle(
            f"Anomaly Mask Visualization - {stage} E{epoch} B{batch_idx} Img{i}",
            fontsize=14,
            fontweight="bold",
        )

        plt.tight_layout()

        # Convert figure to numpy array (RGB format)
        img_array = fig_to_array(fig, dpi=150)

        # Create Artifact object
        artifact = Artifact(
            name=f"anomaly_mask_img{i:02d}",
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
            value=img_array,
            el_id=i,
            desc=f"Anomaly mask for {stage} epoch {epoch}, batch {batch_idx}, image {i}",
            type=ArtifactType.IMAGE,
        )
        artifacts.append(artifact)

        logger.info(log_msg)

        plt.close(fig)

    # Return artifacts
    return {"artifacts": artifacts}
ScoreHeatmapVisualizer
ScoreHeatmapVisualizer(
    normalize_scores=True, cmap="inferno", up_to=5, **kwargs
)

Bases: Node

Log LAD/RX score heatmaps as TensorBoard artifacts.

Source code in cuvis_ai/node/visualizations.py
def __init__(
    self,
    normalize_scores: bool = True,
    cmap: str = "inferno",
    up_to: int | None = 5,
    **kwargs,
) -> None:
    self.normalize_scores = normalize_scores
    self.cmap = cmap
    self.up_to = up_to
    super().__init__(
        execution_stages={ExecutionStage.VAL, ExecutionStage.TEST, ExecutionStage.INFERENCE},
        normalize_scores=normalize_scores,
        cmap=cmap,
        up_to=up_to,
        **kwargs,
    )
forward
forward(scores, context)

Generate heatmap visualizations of anomaly scores.

Creates color-mapped heatmaps of anomaly scores for visualization in TensorBoard. Optionally normalizes scores to [0, 1] range for consistent visualization across batches.

Parameters:

Name Type Description Default
scores Tensor

Anomaly scores [B, H, W, 1] from detection nodes (e.g., RX, LAD).

required
context Context

Execution context with stage, epoch, batch_idx information.

required

Returns:

Type Description
dict[str, list[Artifact]]

Dictionary with "artifacts" key containing list of heatmap artifacts.

Source code in cuvis_ai/node/visualizations.py
def forward(self, scores: torch.Tensor, context: Context) -> dict[str, list[Artifact]]:
    """Generate heatmap visualizations of anomaly scores.

    Creates color-mapped heatmaps of anomaly scores for visualization
    in TensorBoard. Optionally normalizes scores to [0, 1] range for
    consistent visualization across batches.

    Parameters
    ----------
    scores : Tensor
        Anomaly scores [B, H, W, 1] from detection nodes (e.g., RX, LAD).
    context : Context
        Execution context with stage, epoch, batch_idx information.

    Returns
    -------
    dict[str, list[Artifact]]
        Dictionary with "artifacts" key containing list of heatmap artifacts.
    """
    artifacts: list[Artifact] = []
    batch_limit = scores.shape[0] if self.up_to is None else min(scores.shape[0], self.up_to)

    for idx in range(batch_limit):
        score_map = scores[idx, ..., 0].detach().cpu().numpy()

        if self.normalize_scores:
            min_v = float(score_map.min())
            max_v = float(score_map.max())
            if max_v - min_v > 1e-9:
                score_map = (score_map - min_v) / (max_v - min_v)
            else:
                score_map = np.zeros_like(score_map)

        fig, ax = plt.subplots(1, 1, figsize=(4, 4))
        im = ax.imshow(score_map, cmap=self.cmap)
        ax.set_title(f"Score Heatmap #{idx}")
        ax.axis("off")
        fig.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
        img_array = fig_to_array(fig, dpi=150)
        plt.close(fig)

        artifact = Artifact(
            name=f"score_heatmap_img{idx:02d}",
            value=img_array,
            el_id=idx,
            desc="Anomaly score heatmap",
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

    return {"artifacts": artifacts}
RGBAnomalyMask
RGBAnomalyMask(up_to=None, **kwargs)

Bases: Node

Visualize anomaly detection with GT and predicted masks on RGB images.

Similar to AnomalyMask but designed for RGB images (e.g., from band selectors). Creates side-by-side visualizations showing ground truth masks, predicted masks, and overlay comparisons on RGB images. The overlay shows: - Green: True Positives (correct anomaly detection) - Red: False Positives (false alarms) - Yellow: False Negatives (missed anomalies)

Also displays IoU and other metrics. Returns a list of Artifact objects for logging to monitoring systems.

Executes during validation and inference stages.

Parameters:

Name Type Description Default
up_to int

Maximum number of images to visualize. If None, visualizes all (default: None)

None

Examples:

>>> decider = BinaryDecider(threshold=0.2)
>>> viz_mask = RGBAnomalyMask(up_to=5)
>>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
>>> graph.connect(
...     (decider.decisions, viz_mask.decisions),
...     (data_node.mask, viz_mask.mask),
...     (band_selector.rgb_image, viz_mask.rgb_image),
...     (viz_mask.artifacts, tensorboard_node.artifacts),
... )

Initialize RGBAnomalyMask visualizer.

Parameters:

Name Type Description Default
up_to int | None

Maximum number of images to visualize. If None, visualizes all (default: None)

None
Source code in cuvis_ai/node/visualizations.py
def __init__(self, up_to: int | None = None, **kwargs) -> None:
    """Initialize RGBAnomalyMask visualizer.

    Parameters
    ----------
    up_to : int | None, optional
        Maximum number of images to visualize. If None, visualizes all (default: None)
    """
    self.up_to = up_to
    super().__init__(
        execution_stages={ExecutionStage.VAL, ExecutionStage.TEST, ExecutionStage.INFERENCE},
        up_to=up_to,
        **kwargs,
    )
forward
forward(
    decisions,
    rgb_image,
    mask=None,
    context=None,
    scores=None,
)

Create anomaly mask visualizations with GT/pred comparison on RGB images.

Parameters:

Name Type Description Default
decisions Tensor

Binary anomaly decisions [B, H, W, 1]

required
rgb_image Tensor

RGB image [B, H, W, 3] for visualization

required
mask Tensor | None

Ground truth anomaly mask [B, H, W, 1] (optional)

None
context Context | None

Execution context with stage, epoch, batch_idx

None
scores Tensor | None

Optional anomaly logits/scores [B, H, W, 1]

None

Returns:

Type Description
dict

Dictionary with "artifacts" key containing list of Artifact objects

Source code in cuvis_ai/node/visualizations.py
def forward(
    self,
    decisions: torch.Tensor,
    rgb_image: torch.Tensor,
    mask: torch.Tensor | None = None,
    context: Context | None = None,
    scores: torch.Tensor | None = None,
) -> dict:
    """Create anomaly mask visualizations with GT/pred comparison on RGB images.

    Parameters
    ----------
    decisions : torch.Tensor
        Binary anomaly decisions [B, H, W, 1]
    rgb_image : torch.Tensor
        RGB image [B, H, W, 3] for visualization
    mask : torch.Tensor | None
        Ground truth anomaly mask [B, H, W, 1] (optional)
    context : Context | None
        Execution context with stage, epoch, batch_idx
    scores : torch.Tensor | None
        Optional anomaly logits/scores [B, H, W, 1]

    Returns
    -------
    dict
        Dictionary with "artifacts" key containing list of Artifact objects
    """
    if context is None:
        raise ValueError("RGBAnomalyMask.forward() requires a Context object")

    # Convert to numpy only at this point (keep on device until last moment)
    pred_mask_np: np.ndarray = tensor_to_numpy(decisions.float().squeeze(-1))  # [B, H, W]
    rgb_np: np.ndarray = tensor_to_numpy(rgb_image)  # [B, H, W, 3]

    # Normalize RGB to [0, 1]
    if rgb_np.max() > 1.0:
        rgb_np = rgb_np / 255.0
    rgb_np = np.clip(rgb_np, 0.0, 1.0)

    # Check if GT available and valid
    use_gt = (
        mask is not None and context.stage != ExecutionStage.INFERENCE and mask.any().item()
    )

    # Validate and convert GT if available
    gt_mask_np: np.ndarray | None = None
    batch_iou: float | None = None
    if use_gt:
        assert mask is not None
        gt_mask_np = tensor_to_numpy(mask.squeeze(-1))  # [B, H, W]
        unique_values = np.unique(gt_mask_np)
        if not np.all(np.isin(unique_values, [0, 1, True, False])):
            raise ValueError(f"RGBAnomalyMask expects binary masks, found: {unique_values}")
        # Compute batch IoU
        batch_pred = pred_mask_np > 0.5
        batch_gt = gt_mask_np > 0.5
        tp = np.logical_and(batch_pred, batch_gt).sum()
        batch_iou = float(
            tp
            / (
                tp
                + np.logical_and(batch_pred, ~batch_gt).sum()
                + np.logical_and(~batch_pred, batch_gt).sum()
                + 1e-8
            )
        )

    batch_size = pred_mask_np.shape[0]
    up_to_batch = min(batch_size, self.up_to or batch_size)
    artifacts = []

    # Loop through images and visualize
    for i in range(up_to_batch):
        pred = pred_mask_np[i] > 0.5
        rgb_img = rgb_np[i]
        gt = gt_mask_np[i] > 0.5 if gt_mask_np is not None else None

        # Compute metrics and AP if GT available
        metrics: dict | None = None
        per_image_ap: float | None = None
        if gt is not None:
            metrics = self._compute_metrics(pred, gt)
            if scores is not None and mask is not None:
                raw_scores = scores[i, ..., 0]
                probs = torch.sigmoid(raw_scores).flatten()
                target_tensor = mask[i, ..., 0].flatten().to(dtype=torch.long)
                if probs.device != target_tensor.device:
                    target_tensor = target_tensor.to(probs.device)
                if probs.numel() == target_tensor.numel():
                    per_image_ap = binary_average_precision(probs, target_tensor).item()

        # Create figure and plot
        ncols = 3 if gt is not None else 2
        fig, axes = plt.subplots(1, ncols, figsize=(6 * ncols, 6))
        if ncols == 1:
            axes = [axes]

        if gt is not None and metrics is not None and batch_iou is not None:
            self._plot_with_gt(
                axes, rgb_img, pred, gt, metrics, batch_iou, batch_size, per_image_ap
            )
            log_msg = (
                f"Created RGB anomaly mask ({i + 1}/{up_to_batch}): IoU={metrics['iou']:.3f}"
            )
        else:
            self._plot_no_gt(axes, rgb_img, pred)
            log_msg = f"Created RGB anomaly mask ({i + 1}/{up_to_batch}) (no GT)"

        plt.tight_layout()
        img_array = fig_to_array(fig, dpi=150)
        plt.close(fig)

        artifact = Artifact(
            name=f"rgb_anomaly_mask_img{i:02d}",
            value=img_array,
            el_id=i,
            desc=log_msg,
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

    return {"artifacts": artifacts}

TensorBoard Visualization

drcnn_tensorboard_viz

TensorBoard visualization node for DRCNN-AdaClip training.

This node creates image artifacts for TensorBoard logging to visualize: - Input HSI cube (false-color RGB visualization) - Mixer output (what AdaClip sees as input) - Ground truth anomaly masks - AdaClip anomaly scores (as heatmap)

DRCNNTensorBoardViz
DRCNNTensorBoardViz(
    hsi_channels=None,
    max_samples=4,
    log_every_n_batches=1,
    **kwargs,
)

Bases: Node

TensorBoard visualization node for DRCNN-AdaClip pipeline.

Creates image artifacts for logging to TensorBoard: - Input HSI cube visualization (false-color RGB from selected channels) - Mixer output (3-channel RGB-like image that AdaClip sees) - Ground truth anomaly mask - AdaClip anomaly scores (as heatmap)

Parameters:

Name Type Description Default
hsi_channels list[int]

Channel indices to use for false-color RGB visualization of HSI input (default: [0, 20, 40] for a simple false-color representation)

None
max_samples int

Maximum number of samples to log per batch (default: 4)

4
log_every_n_batches int

Log images every N batches to reduce TensorBoard size (default: 1, log every batch)

1
Source code in cuvis_ai/node/drcnn_tensorboard_viz.py
def __init__(
    self,
    hsi_channels: list[int] | None = None,
    max_samples: int = 4,
    log_every_n_batches: int = 1,
    **kwargs,
) -> None:
    if hsi_channels is None:
        hsi_channels = [0, 20, 40]  # Default: use channels 0, 20, 40 for false-color RGB
    self.hsi_channels = hsi_channels
    self.max_samples = max_samples
    self.log_every_n_batches = log_every_n_batches
    self._batch_counter = 0

    super().__init__(
        execution_stages={ExecutionStage.TRAIN, ExecutionStage.VAL, ExecutionStage.TEST},
        hsi_channels=hsi_channels,
        max_samples=max_samples,
        log_every_n_batches=log_every_n_batches,
        **kwargs,
    )
forward
forward(
    hsi_cube,
    mixer_output,
    ground_truth_mask,
    adaclip_scores,
    context=None,
    **_,
)

Create image artifacts for TensorBoard logging.

Parameters:

Name Type Description Default
hsi_cube Tensor

Input HSI cube [B, H, W, C]

required
mixer_output Tensor

Mixer output (RGB-like) [B, H, W, 3]

required
ground_truth_mask Tensor

Ground truth anomaly mask [B, H, W, 1]

required
adaclip_scores Tensor

AdaClip anomaly scores [B, H, W, 1]

required
context Context

Execution context with stage, epoch, batch_idx info

None

Returns:

Type Description
dict[str, list[Artifact]]

Dictionary with "artifacts" key containing list of Artifact objects

Source code in cuvis_ai/node/drcnn_tensorboard_viz.py
def forward(
    self,
    hsi_cube: Tensor,
    mixer_output: Tensor,
    ground_truth_mask: Tensor,
    adaclip_scores: Tensor,
    context: Context | None = None,
    **_: Any,
) -> dict[str, list[Artifact]]:
    """Create image artifacts for TensorBoard logging.

    Parameters
    ----------
    hsi_cube : Tensor
        Input HSI cube [B, H, W, C]
    mixer_output : Tensor
        Mixer output (RGB-like) [B, H, W, 3]
    ground_truth_mask : Tensor
        Ground truth anomaly mask [B, H, W, 1]
    adaclip_scores : Tensor
        AdaClip anomaly scores [B, H, W, 1]
    context : Context, optional
        Execution context with stage, epoch, batch_idx info

    Returns
    -------
    dict[str, list[Artifact]]
        Dictionary with "artifacts" key containing list of Artifact objects
    """
    if context is None:
        context = Context()

    # Skip logging if not the right batch interval
    self._batch_counter += 1
    if (self._batch_counter - 1) % self.log_every_n_batches != 0:
        return {"artifacts": []}

    artifacts = []
    B = hsi_cube.shape[0]
    num_samples = min(B, self.max_samples)

    # Convert tensors to numpy for visualization
    hsi_np = hsi_cube.detach().cpu().numpy()
    mixer_np = mixer_output.detach().cpu().numpy()
    mask_np = ground_truth_mask.detach().cpu().numpy()
    scores_np = adaclip_scores.detach().cpu().numpy()

    for b in range(num_samples):
        # 1. HSI Input Visualization (false-color RGB)
        hsi_img = self._create_hsi_visualization(hsi_np[b])
        artifact = Artifact(
            name=f"hsi_input_sample_{b}",
            value=hsi_img,
            el_id=b,
            desc=f"HSI input (false-color RGB) for sample {b}",
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

        # 2. Mixer Output (what AdaClip sees as input)
        mixer_img = self._normalize_image(mixer_np[b])  # Already [H, W, 3]
        artifact = Artifact(
            name=f"mixer_output_adaclip_input_sample_{b}",
            value=mixer_img,
            el_id=b,
            desc=f"Mixer output (AdaClip input) for sample {b}",
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

        # 3. Ground Truth Mask
        mask_img = self._create_mask_visualization(mask_np[b])  # [H, W, 1] -> [H, W, 3]
        artifact = Artifact(
            name=f"ground_truth_mask_sample_{b}",
            value=mask_img,
            el_id=b,
            desc=f"Ground truth anomaly mask for sample {b}",
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

        # 4. AdaClip Scores (as heatmap)
        scores_img = self._create_scores_heatmap(scores_np[b])  # [H, W, 1] -> [H, W, 3]
        artifact = Artifact(
            name=f"adaclip_scores_heatmap_sample_{b}",
            value=scores_img,
            el_id=b,
            desc=f"AdaClip anomaly scores (heatmap) for sample {b}",
            type=ArtifactType.IMAGE,
            stage=context.stage,
            epoch=context.epoch,
            batch_idx=context.batch_idx,
        )
        artifacts.append(artifact)

    return {"artifacts": artifacts}

Monitor

monitor

TensorBoard Monitoring Nodes.

This module provides nodes for logging artifacts and metrics to TensorBoard during pipeline execution. The monitoring nodes are sink nodes that accept artifacts (visualizations) and metrics from upstream nodes and write them to TensorBoard logs for visualization and analysis.

The primary use case is logging training and validation metrics, along with visualizations like heatmaps, RGB renderings, and PCA plots during model training.

See Also

cuvis_ai.node.visualizations : Nodes that generate artifacts for monitoring

TensorBoardMonitorNode
TensorBoardMonitorNode(
    output_dir="./runs",
    run_name=None,
    comment="",
    flush_secs=120,
    **kwargs,
)

Bases: Node

TensorBoard monitoring node for logging artifacts and metrics.

This is a SINK node that logs visualizations (artifacts) and metrics to TensorBoard. Accepts optional inputs for artifacts and metrics, allowing predecessors to be filtered by execution_stage without causing errors.

Executes during all stages (ALWAYS).

Parameters:

Name Type Description Default
output_dir str

Directory for TensorBoard logs (default: "./runs")

'./runs'
comment str

Comment to append to log directory name (default: "")

''
flush_secs int

How often to flush pending events to disk (default: 120)

120

Examples:

>>> heatmap_viz = AnomalyHeatmap(cmap='hot', up_to=10)
>>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
>>> graph.connect(
...     (heatmap_viz.artifacts, tensorboard_node.artifacts),
... )
Source code in cuvis_ai/node/monitor.py
def __init__(
    self,
    output_dir: str = "./runs",
    run_name: str | None = None,
    comment: str = "",
    flush_secs: int = 120,
    **kwargs,
) -> None:
    self.output_dir = Path(output_dir)
    self.run_name = run_name
    self.comment = comment
    self.flush_secs = flush_secs
    self._writer = None
    self._tensorboard_available = False

    super().__init__(
        execution_stages={ExecutionStage.ALWAYS},
        output_dir=str(output_dir),
        run_name=run_name,
        comment=comment,
        flush_secs=flush_secs,
        **kwargs,
    )

    # Check if tensorboard is available

    self._SummaryWriter = SummaryWriter

    # Determine the log directory with run name
    self.log_dir = self._resolve_log_dir()

    # Initialize TensorBoard writer
    self.log_dir.mkdir(parents=True, exist_ok=True)
    self._writer = self._SummaryWriter(
        log_dir=str(self.log_dir),
        comment=self.comment,
        flush_secs=self.flush_secs,
    )
    logger.info(f"TensorBoard writer initialized: {self.log_dir}")
    logger.info(f"To view visualizations, run: uv run tensorboard --logdir={self.output_dir}")
forward
forward(artifacts=None, metrics=None, context=None)

Log artifacts and metrics to TensorBoard.

Parameters:

Name Type Description Default
context Context

Execution context with stage, epoch, batch_idx, global_step

None
artifacts list[Artifact]

List of artifacts to log (default: None)

None
metrics list[Metric]

List of metrics to log (default: None)

None

Returns:

Type Description
dict

Empty dict (sink node has no outputs)

Source code in cuvis_ai/node/monitor.py
def forward(
    self,
    artifacts: list[Artifact] | None = None,
    metrics: list[Metric] | None = None,
    context: Context | None = None,
) -> dict:
    """Log artifacts and metrics to TensorBoard.

    Parameters
    ----------
    context : Context
        Execution context with stage, epoch, batch_idx, global_step
    artifacts : list[Artifact], optional
        List of artifacts to log (default: None)
    metrics : list[Metric], optional
        List of metrics to log (default: None)

    Returns
    -------
    dict
        Empty dict (sink node has no outputs)
    """
    if context is None:
        context = Context()

    stage = context.stage.value
    step = context.global_step

    # Flatten artifacts if it's a list of lists (variadic port)
    if artifacts is not None:
        if (
            isinstance(artifacts, list)
            and len(artifacts) > 0
            and isinstance(artifacts[0], list)
        ):
            artifacts = [item for sublist in artifacts for item in sublist]

    # Log artifacts
    if artifacts is not None:
        for artifact in artifacts:
            self._log_artifact(artifact, stage, step)
        logger.debug(f"Logged {len(artifacts)} artifacts to TensorBoard at step {step}")

    # Flatten metrics if variadic input provided
    if (
        metrics is not None
        and isinstance(metrics, list)
        and metrics
        and isinstance(metrics[0], list)
    ):
        metrics = [item for sublist in metrics for item in sublist]

    # Log metrics
    if metrics is not None:
        for metric in metrics:
            self._log_metric(metric, stage, step)
        logger.debug(f"Logged {len(metrics)} metrics to TensorBoard at step {step}")

    return {}
log
log(name, value, step)

Log a scalar value to TensorBoard.

This method provides a simple interface for external trainers to log metrics directly, complementing the port-based logging. Used by GradientTrainer to log train/val losses to the same TensorBoard directory as graph metrics and artifacts.

Parameters:

Name Type Description Default
name str

Name/tag for the scalar (e.g., "train/loss", "val/accuracy")

required
value float

Scalar value to log

required
step int

Global step number

required

Examples:

>>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
>>> # From external trainer
>>> tensorboard_node.log("train/loss", 0.5, step=100)
Source code in cuvis_ai/node/monitor.py
def log(self, name: str, value: float, step: int) -> None:
    """Log a scalar value to TensorBoard.

    This method provides a simple interface for external trainers
    to log metrics directly, complementing the port-based logging.
    Used by GradientTrainer to log train/val losses to the same
    TensorBoard directory as graph metrics and artifacts.

    Parameters
    ----------
    name : str
        Name/tag for the scalar (e.g., "train/loss", "val/accuracy")
    value : float
        Scalar value to log
    step : int
        Global step number

    Examples
    --------
    >>> tensorboard_node = TensorBoardMonitorNode(output_dir="./runs")
    >>> # From external trainer
    >>> tensorboard_node.log("train/loss", 0.5, step=100)
    """
    self._writer.add_scalar(name, value, step)

Label Processing

Nodes for label conversion and manipulation.

Labels

labels

Label Mapping Nodes.

This module provides nodes for converting multi-class segmentation masks to binary anomaly labels. These nodes are useful when training with datasets that have multi-class annotations but the task requires binary anomaly detection.

The main node remaps class IDs to binary labels (0=normal, 1=anomaly) based on configurable normal and anomaly class ID lists.

See Also

cuvis_ai.deciders : Binary decision nodes for threshold-based classification

BinaryAnomalyLabelMapper
BinaryAnomalyLabelMapper(
    normal_class_ids, anomaly_class_ids=None, **kwargs
)

Bases: Node

Convert multi-class segmentation masks to binary anomaly targets.

Masks are remapped to torch.long tensors with 0 representing normal pixels and 1 indicating anomalies.

Parameters:

Name Type Description Default
normal_class_ids Iterable[int]

Class IDs that should be considered normal (default: (0, 2)).

required
anomaly_class_ids Iterable[int] | None

Explicit anomaly IDs. When None all IDs not in normal_class_ids are treated as anomalies. When provided, only these IDs are treated as anomalies and all others (including those not in normal_class_ids) are treated as normal.

None
Source code in cuvis_ai/node/labels.py
def __init__(
    self,
    normal_class_ids: Iterable[int],
    anomaly_class_ids: Iterable[int] | None = None,
    **kwargs,
) -> None:
    self.normal_class_ids = tuple(int(c) for c in normal_class_ids)
    self.anomaly_class_ids = (
        tuple(int(c) for c in anomaly_class_ids) if anomaly_class_ids is not None else None
    )

    # Validate that there are no overlaps between normal and anomaly class IDs
    if self.anomaly_class_ids is not None:
        overlap = set(self.normal_class_ids) & set(self.anomaly_class_ids)
        if overlap:
            raise ValueError(
                f"Overlap detected between normal_class_ids and anomaly_class_ids: {overlap}. "
                "Class IDs cannot be both normal and anomaly."
            )

        # Check for gaps in coverage and issue warning
        all_specified_ids = set(self.normal_class_ids) | set(self.anomaly_class_ids)
        max_id = max(all_specified_ids) if all_specified_ids else 0

        # Find gaps (missing class IDs)
        expected_ids = set(range(max_id + 1))
        gaps = expected_ids - all_specified_ids

        if gaps:
            warnings.warn(
                f"Gap detected in class ID coverage. The following class IDs are not specified "
                f"in either normal_class_ids or anomaly_class_ids: {gaps}. "
                f"These will be treated as normal classes. To specify all classes explicitly, "
                f"include them in normal_class_ids or anomaly_class_ids.",
                UserWarning,
                stacklevel=2,
            )
            # Add gaps to normal_class_ids as requested
            self.normal_class_ids = tuple(sorted(set(self.normal_class_ids) | gaps))

    self._target_dtype = torch.long

    super().__init__(
        normal_class_ids=self.normal_class_ids,
        anomaly_class_ids=self.anomaly_class_ids,
        **kwargs,
    )
forward
forward(cube, mask, **_)

Map multi-class labels to binary anomaly labels.

Parameters:

Name Type Description Default
cube Tensor

Features/scores to pass through [B, H, W, C]

required
mask Tensor

Multi-class segmentation masks [B, H, W, 1]

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "cube" (pass-through) and "mask" (binary bool) keys

Source code in cuvis_ai/node/labels.py
def forward(self, cube: Tensor, mask: Tensor, **_: Any) -> dict[str, Tensor]:
    """Map multi-class labels to binary anomaly labels.

    Parameters
    ----------
    cube : Tensor
        Features/scores to pass through [B, H, W, C]
    mask : Tensor
        Multi-class segmentation masks [B, H, W, 1]

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "cube" (pass-through) and "mask" (binary bool) keys
    """
    if self.anomaly_class_ids is not None:
        # Explicit anomaly class IDs: only these are anomalies, rest are normal
        mask_anomaly = self._membership_mask(mask, self.anomaly_class_ids)
    else:
        # Original behavior: normal_class_ids are normal, everything else is anomaly
        mask_normal = self._membership_mask(mask, self.normal_class_ids)
        mask_anomaly = ~mask_normal

    mapped = torch.zeros_like(mask, dtype=self._target_dtype, device=mask.device)
    mapped = torch.where(mask_anomaly, torch.ones_like(mapped), mapped)

    # Convert to bool for smaller tensor size
    mapped = mapped.bool()

    return {"cube": cube, "mask": mapped}