Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Statistical Nodes

This category groups current classical anomaly and spectral analysis nodes.

RX And LAD

rx_detector

RX anomaly detection nodes for hyperspectral imaging.

This module implements the Reed-Xiaoli (RX) anomaly detection algorithm, a widely used statistical method for detecting anomalies in hyperspectral images. The RX algorithm computes squared Mahalanobis distance from the background distribution, treating pixels with large distances as potential anomalies.

The module provides two variants:

  • RXGlobal: Uses global statistics (mean, covariance) estimated from training data. Supports two-phase training: statistical initialization followed by optional gradient-based fine-tuning via unfreeze().

  • RXPerBatch: Computes statistics independently for each batch on-the-fly without requiring initialization. Useful for real-time processing or when training data is unavailable.

Reference: Reed, I. S., & Yu, X. (1990). "Adaptive multiple-band CFAR detection of an optical pattern with unknown spectral distribution." IEEE Transactions on Acoustics, Speech, and Signal Processing, 38(10), 1760-1770.

RXBase

RXBase(eps=1e-06, **kwargs)

Bases: Node

Base class for RX anomaly detectors.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(self, eps: float = 1e-6, **kwargs) -> None:
    self.eps = eps
    super().__init__(eps=eps, **kwargs)

RXGlobal

RXGlobal(
    num_channels, eps=1e-06, cache_inverse=True, **kwargs
)

Bases: RXBase

RX anomaly detector with global background statistics.

Uses global mean (μ) and covariance (Σ) estimated from training data to compute Mahalanobis distance scores. Supports two-phase training: statistical initialization followed by optional gradient-based fine-tuning.

The detector computes anomaly scores as:

RX(x) = (x - μ)ᵀ Σ⁻¹ (x - μ)

where x is a pixel spectrum, μ is the background mean, and Σ is the covariance matrix.

Parameters:

Name Type Description Default
num_channels int

Number of spectral channels in input data

required
eps float

Small constant added to covariance diagonal for numerical stability (default: 1e-6)

1e-06
cache_inverse bool

If True, precompute and cache Σ⁻¹ for faster inference (default: True)

True
**kwargs dict

Additional arguments passed to Node base class

{}

Attributes:

Name Type Description
mu Tensor or Parameter

Background mean spectrum, shape (C,). Initially a buffer, becomes Parameter after unfreeze()

cov Tensor or Parameter

Background covariance matrix, shape (C, C)

cov_inv Tensor or Parameter

Cached pseudo-inverse of covariance (if cache_inverse=True)

_statistically_initialized bool

Flag indicating whether statistical_initialization() has been called

Examples:

>>> from cuvis_ai.anomaly.rx_detector import RXGlobal
>>> from cuvis_ai_core.training import StatisticalTrainer
>>>
>>> # Create RX detector
>>> rx = RXGlobal(num_channels=61, eps=1.0e-6)
>>>
>>> # Phase 1: Statistical initialization
>>> stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
>>> stat_trainer.fit()  # Computes μ and Σ from training data
>>>
>>> # Inference with frozen statistics
>>> output = rx.forward(data=hyperspectral_cube)
>>> scores = output["scores"]  # [B, H, W, 1]
>>>
>>> # Phase 2: Optional gradient-based fine-tuning
>>> rx.unfreeze()  # Convert buffers to nn.Parameters
>>> # Now μ and Σ can be updated with gradient descent
See Also

RXPerBatch : Per-batch RX variant without training MinMaxNormalizer : Recommended preprocessing before RX ScoreToLogit : Convert scores to logits for classification docs/tutorials/rx-statistical.md : Complete RX pipeline tutorial

Notes

After statistical_initialization(), mu and cov are stored as buffers (frozen by default). Call unfreeze() to convert them to trainable nn.Parameters for gradient-based optimization.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(
    self, num_channels: int, eps: float = 1e-6, cache_inverse: bool = True, **kwargs
) -> None:
    self.num_channels = int(num_channels)
    self.eps = eps
    self.cache_inverse = cache_inverse
    # Call Node.__init__ directly with all parameters for proper serialization
    # We bypass RXBase.__init__ since it only accepts eps
    # Node.__init__(self, num_channels=self.num_channels, eps=self.eps, cache_inverse=self.cache_inverse)

    super().__init__(
        num_channels=self.num_channels, eps=self.eps, cache_inverse=self.cache_inverse, **kwargs
    )

    # global stats - all stored as buffers initially
    self.register_buffer("mu", torch.zeros(self.num_channels, dtype=torch.float32))  # (C,)
    self.register_buffer(
        "cov", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float32)
    )  # (C,C)
    self.register_buffer(
        "cov_inv", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float32)
    )  # (C,C)
    self._welford = WelfordAccumulator(self.num_channels, track_covariance=True)
    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Initialize mu and Sigma from data iterator.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is BHWC

required
Source code in cuvis_ai/anomaly/rx_detector.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Initialize mu and Sigma from data iterator.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is BHWC
    """
    self.reset()
    for batch_data in input_stream:
        # Extract data from port-based dict
        x = batch_data["data"]
        if x is not None:
            self.update(x)

    if self._welford.count <= 1:
        self._statistically_initialized = False
        raise RuntimeError(
            "RXGlobal.statistical_initialization() received insufficient samples. "
            "Expected at least 2 valid pixels."
        )
    self.finalize()
update
update(batch_bhwc)

Update streaming statistics with a new batch.

Parameters:

Name Type Description Default
batch_bhwc Tensor

Input batch in BHWC format, shape (B, H, W, C)

required
Source code in cuvis_ai/anomaly/rx_detector.py
@torch.no_grad()
def update(self, batch_bhwc: torch.Tensor) -> None:
    """Update streaming statistics with a new batch.

    Parameters
    ----------
    batch_bhwc : torch.Tensor
        Input batch in BHWC format, shape (B, H, W, C)
    """
    X = _flatten_bhwc(batch_bhwc).reshape(-1, batch_bhwc.shape[-1])  # (M,C)
    if X.shape[0] <= 1:
        return
    # Adapt accumulator if actual data channels differ from constructor's num_channels
    # (e.g., upstream SoftChannelSelector preserves all channels instead of reducing)
    if X.shape[1] != self._welford._n_features:
        self._welford = WelfordAccumulator(X.shape[1], track_covariance=True).to(
            device=X.device
        )
    self._welford.update(X)
    self._statistically_initialized = False
finalize
finalize()

Compute final mean and covariance from accumulated streaming statistics.

This method converts the running accumulators (_mean, _M2) into the final mean (mu) and covariance (cov) matrices. The covariance is regularized with eps * I for numerical stability, and optionally caches the pseudo-inverse.

Returns:

Type Description
RXGlobal

Returns self for method chaining

Raises:

Type Description
ValueError

If fewer than 2 samples were accumulated (insufficient for covariance estimation)

Notes

After finalization, mu and cov are stored as buffers (frozen by default). Call unfreeze() to convert them to nn.Parameters for gradient-based training.

Source code in cuvis_ai/anomaly/rx_detector.py
@torch.no_grad()
def finalize(self) -> "RXGlobal":
    """Compute final mean and covariance from accumulated streaming statistics.

    This method converts the running accumulators (_mean, _M2) into the final
    mean (mu) and covariance (cov) matrices. The covariance is regularized with
    eps * I for numerical stability, and optionally caches the pseudo-inverse.

    Returns
    -------
    RXGlobal
        Returns self for method chaining

    Raises
    ------
    ValueError
        If fewer than 2 samples were accumulated (insufficient for covariance estimation)

    Notes
    -----
    After finalization, mu and cov are stored as buffers (frozen by default).
    Call unfreeze() to convert them to nn.Parameters for gradient-based training.
    """
    if self._welford.count <= 1:
        raise ValueError("Not enough samples to finalize.")
    self.mu = self._welford.mean
    cov = self._welford.cov
    if self.eps > 0:
        cov = cov + self.eps * torch.eye(cov.shape[0], device=cov.device, dtype=cov.dtype)
    self.cov = cov
    if self.cache_inverse:
        self.cov_inv = torch.linalg.pinv(cov)
    else:
        self.cov_inv = torch.empty(0, 0)
    self._statistically_initialized = True
    return self
reset
reset()

Reset all statistics and accumulators to empty state.

Clears mu, cov, cov_inv, and all streaming accumulators (_mean, _M2, _n). After reset, the detector must be re-initialized via statistical_initialization() before it can be used for inference.

Notes

Use this method when you need to re-initialize the detector with different training data or when switching between different dataset distributions.

Source code in cuvis_ai/anomaly/rx_detector.py
def reset(self) -> None:
    """Reset all statistics and accumulators to empty state.

    Clears mu, cov, cov_inv, and all streaming accumulators (_mean, _M2, _n).
    After reset, the detector must be re-initialized via statistical_initialization()
    before it can be used for inference.

    Notes
    -----
    Use this method when you need to re-initialize the detector with different
    training data or when switching between different dataset distributions.
    """
    self.mu.zero_()
    self.cov.zero_()
    self.cov_inv.zero_()
    self._welford.reset()
    self._statistically_initialized = False
forward
forward(data, **_)

Forward pass computing anomaly scores.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "scores" key containing BHW1 anomaly scores

Source code in cuvis_ai/anomaly/rx_detector.py
def forward(self, data: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Forward pass computing anomaly scores.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "scores" key containing BHW1 anomaly scores
    """
    if not self._statistically_initialized or self.mu.numel() == 0:
        raise RuntimeError(
            "RXGlobal not initialized. Call statistical_initialization() before inference."
        )
    B, H, W, C = data.shape
    N = H * W
    X = data.view(B, N, C)
    # Convert dtype if needed, but don't change device (assumes everything on same device)
    Xc = X - self.mu.to(X.dtype)
    if self.cov_inv.numel() > 0:
        cov_inv = self.cov_inv.to(X.dtype)
        md2 = torch.einsum("bnc,cd,bnd->bn", Xc, cov_inv, Xc)  # (B,N)
    else:
        md2 = self._quad_form_solve(Xc, self.cov.to(X.dtype))
    scores = md2.view(B, H, W).unsqueeze(-1)  # Add channel dimension (B,H,W,1)
    return {"scores": scores}

RXPerBatch

RXPerBatch(eps=1e-06, **kwargs)

Bases: RXBase

Computes μ, Σ per image in the batch on the fly; no fit/finalize.

Source code in cuvis_ai/anomaly/rx_detector.py
def __init__(self, eps: float = 1e-6, **kwargs) -> None:
    self.eps = eps
    super().__init__(eps=eps, **kwargs)
forward
forward(data, **_)

Forward pass computing per-batch anomaly scores.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "scores" key containing BHW1 anomaly scores

Source code in cuvis_ai/anomaly/rx_detector.py
def forward(self, data: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Forward pass computing per-batch anomaly scores.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "scores" key containing BHW1 anomaly scores
    """
    B, H, W, C = data.shape
    N = H * W
    X_flat = _flatten_bhwc(data)  # (B,N,C)
    mu = X_flat.mean(1, keepdim=True)  # (B,1,C)
    Xc = X_flat - mu
    cov = torch.matmul(Xc.transpose(1, 2), Xc) / max(N - 1, 1)  # (B,C,C)
    eye = torch.eye(C, device=data.device, dtype=data.dtype).expand(B, C, C)
    cov = cov + self.eps * eye
    md2 = self._quad_form_solve(Xc, cov)  # (B,N)
    scores = md2.view(B, H, W)
    return {"scores": scores.unsqueeze(-1)}

lad_detector

Laplacian Anomaly Detector (LAD) for hyperspectral anomaly detection.

This module implements the Laplacian Anomaly Detector, a graph-based approach for detecting spectral anomalies in hyperspectral images. LAD constructs a spectral graph using Cauchy similarity weights and computes anomaly scores based on the graph Laplacian.

The LAD algorithm identifies anomalies by measuring how unusual a pixel's spectral signature is within the spectral manifold learned from background data. Unlike RX detectors that assume Gaussian distributions, LAD captures nonlinear manifold structures through graph construction.

Reference: Gu, Y., Liu, Y., & Zhang, Y. (2008). "A selective KPCA algorithm based on high-order statistics for anomaly detection in hyperspectral imagery." IEEE Geoscience and Remote Sensing Letters, 5(1), 43-47.

LADGlobal

LADGlobal(
    num_channels,
    eps=1e-08,
    normalize_laplacian=True,
    use_numpy_laplacian=True,
    **kwargs,
)

Bases: Node

Laplacian Anomaly Detector (global), variant 'C' (Cauchy), port-based.

This is the new cuvis.ai v3 implementation of the LAD detector. It follows the same mathematical definition as the legacy v2 LADGlobal, but exposes a port-based interface compatible with CuvisPipeline, StatisticalTrainer, and GradientTrainer.

Ports

INPUT_SPECS data : float32, shape (-1, -1, -1, -1) Input hyperspectral cube in BHWC format. OUTPUT_SPECS scores : float32, shape (-1, -1, -1, 1) Per pixel anomaly scores in BHW1 format.

Parameters:

Name Type Description Default
eps float

Small epsilon value for numerical stability in Laplacian construction.

1e-8
normalize_laplacian bool

If True, applies symmetric normalization: L = D^{-½} (D - A) D^{-½}. If False, uses unnormalized Laplacian: L = D - A.

True
use_numpy_laplacian bool

If True, constructs the Laplacian matrix using NumPy (float64, 1e-12 eps) for parity with reference implementations. If False, uses pure PyTorch.

True
Training

After statistical initialization via statistical_initialization(), the node can be made trainable by calling unfreeze(). This converts the mean M and Laplacian L buffers to trainable nn.Parameter objects, enabling gradient-based fine-tuning.

Example

lad = LADGlobal(num_channels=61) stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule) stat_trainer.fit() # Statistical initialization lad.unfreeze() # Enable gradient training grad_trainer = GradientTrainer(pipeline=pipeline, datamodule=datamodule, ...) grad_trainer.fit() # Gradient-based fine-tuning

Source code in cuvis_ai/anomaly/lad_detector.py
def __init__(
    self,
    num_channels: int,
    eps: float = 1e-8,
    normalize_laplacian: bool = True,
    use_numpy_laplacian: bool = True,
    **kwargs: Any,
) -> None:
    self.num_channels = int(num_channels)
    self.eps = float(eps)
    self.normalize_laplacian = bool(normalize_laplacian)
    self.use_numpy_laplacian = bool(use_numpy_laplacian)

    super().__init__(
        num_channels=self.num_channels,
        eps=self.eps,
        normalize_laplacian=self.normalize_laplacian,
        use_numpy_laplacian=self.use_numpy_laplacian,
        **kwargs,
    )

    self._welford = WelfordAccumulator(self.num_channels)
    # Model buffers
    self.register_buffer("M", torch.zeros(self.num_channels, dtype=torch.float64))  # (C,)
    self.register_buffer(
        "L", torch.zeros(self.num_channels, self.num_channels, dtype=torch.float64)
    )  # (C, C)
    self._statistically_initialized = False
statistical_initialization
statistical_initialization(input_stream)

Compute global mean M and Laplacian L from a port-based input stream.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS. Expected format: {"data": tensor} where tensor is BHWC.

required
Source code in cuvis_ai/anomaly/lad_detector.py
def statistical_initialization(self, input_stream: InputStream) -> None:
    """Compute global mean M and Laplacian L from a port-based input stream.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS.
        Expected format: ``{"data": tensor}`` where tensor is BHWC.
    """
    self.reset()

    for batch_data in input_stream:
        x = batch_data.get("data")
        if x is not None:
            self.update(x)

    if self._welford.count <= 0:
        raise RuntimeError("No samples provided to LADGlobal.statistical_initialization()")

    self.finalize()
    self._statistically_initialized = True
update
update(batch_bhwc)

Update running mean statistics from a BHWC batch.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def update(self, batch_bhwc: torch.Tensor) -> None:
    """Update running mean statistics from a BHWC batch."""
    B, H, W, C = batch_bhwc.shape
    X = batch_bhwc.reshape(B * H * W, C)
    if X.shape[0] <= 0:
        return
    self._welford.update(X)
    self._statistically_initialized = False
finalize
finalize()

Finalize mean and Laplacian from accumulated statistics.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def finalize(self) -> None:
    """Finalize mean and Laplacian from accumulated statistics."""
    if self._welford.count <= 0:
        raise RuntimeError("No samples accumulated for LADGlobal.finalize()")

    M = self._welford.mean.to(dtype=torch.float64)
    C = M.shape[0]
    a = M.mean()

    if self.use_numpy_laplacian:
        # NumPy implementation for exact parity with legacy version
        M_np = M.detach().cpu().numpy()
        A_abs = np.abs(M_np[:, None] - M_np[None, :])
        a_np = float(M_np.mean())
        A_np = 1.0 / (1.0 + (A_abs / (a_np + 1e-12)) ** 2)
        np.fill_diagonal(A_np, 0.0)
        D_np = np.diag(A_np.sum(axis=1))
        L_np = D_np - A_np

        if self.normalize_laplacian:
            d_np = np.diag(D_np)
            d_inv_sqrt_np = np.where(d_np > 0, 1.0 / (np.sqrt(d_np) + 1e-12), 0.0)
            D_inv_sqrt_np = np.diag(d_inv_sqrt_np)
            L_np = D_inv_sqrt_np @ L_np @ D_inv_sqrt_np

        L = torch.from_numpy(L_np).to(dtype=torch.float64, device=M.device)
    else:
        Mi = M.view(C, 1)
        Mj = M.view(1, C)
        denom = a + torch.tensor(1e-12, dtype=torch.float64, device=M.device)
        diff = torch.abs(Mi - Mj) / denom
        A = 1.0 / (1.0 + diff.pow(2))
        A.fill_diagonal_(0.0)

        D = torch.diag(A.sum(dim=1))
        L = D - A

        if self.normalize_laplacian:
            d = torch.diag(D)
            d_inv_sqrt = torch.where(
                d > 0,
                1.0 / torch.sqrt(d + torch.tensor(1e-12, dtype=torch.float64, device=M.device)),
                torch.zeros_like(d),
            )
            D_inv_sqrt = torch.diag(d_inv_sqrt)
            L = D_inv_sqrt @ L @ D_inv_sqrt

    self.M = M
    self.L = L
    self._statistically_initialized = True
reset
reset()

Reset all statistics and model parameters to initial state.

Clears the streaming mean accumulator (_mean_run), sample count (_count), global mean (M), and Laplacian matrix (L). After reset, the detector must be re-initialized via statistical_initialization() before inference.

Notes

Use this method to re-initialize the detector with different training data or when switching between different spectral distributions.

Source code in cuvis_ai/anomaly/lad_detector.py
@torch.no_grad()
def reset(self) -> None:
    """Reset all statistics and model parameters to initial state.

    Clears the streaming mean accumulator (_mean_run), sample count (_count),
    global mean (M), and Laplacian matrix (L). After reset, the detector must
    be re-initialized via statistical_initialization() before inference.

    Notes
    -----
    Use this method to re-initialize the detector with different training data
    or when switching between different spectral distributions.
    """
    self._welford.reset()
    self.M.zero_()
    self.L.zero_()
    self._statistically_initialized = False
forward
forward(data, **_)

Compute LAD anomaly scores for a BHWC cube.

Parameters:

Name Type Description Default
data Tensor

Input tensor in BHWC format.

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with key "scores" containing BHW1 anomaly scores.

Source code in cuvis_ai/anomaly/lad_detector.py
def forward(self, data: torch.Tensor, **_: Any) -> dict[str, torch.Tensor]:
    """Compute LAD anomaly scores for a BHWC cube.

    Parameters
    ----------
    data : torch.Tensor
        Input tensor in BHWC format.

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with key ``"scores"`` containing BHW1 anomaly scores.
    """
    if self.M.numel() == 0 or self.L.numel() == 0 or not self._statistically_initialized:
        raise RuntimeError(
            "LADGlobal not finalized. Call statistical_initialization() before forward()."
        )

    B, H, W, C = data.shape
    N = H * W

    X = data.view(B, N, C)

    Xc = X - self.M.to(dtype=X.dtype)
    L = self.L.to(dtype=X.dtype)

    scores = torch.einsum("bnc,cd,bnd->bn", Xc, L, Xc).view(B, H, W).unsqueeze(-1)
    return {"scores": scores}

Spectral Angle Mapper

spectral_angle_mapper

Spectral Angle Mapper node.

SpectralAngleMapper

SpectralAngleMapper(num_channels, eps=1e-12, **kwargs)

Bases: Node

Compute per-pixel spectral angle against one or more reference spectra.

Source code in cuvis_ai/node/spectral_angle_mapper.py
def __init__(self, num_channels: int, eps: float = 1e-12, **kwargs: Any) -> None:
    if int(num_channels) <= 0:
        raise ValueError(f"num_channels must be > 0, got {num_channels}")
    self.num_channels = int(num_channels)
    self.eps = float(eps)
    super().__init__(num_channels=self.num_channels, eps=self.eps, **kwargs)
forward
forward(cube, spectral_signature, **_)

Run spectral-angle scoring for all references.

Source code in cuvis_ai/node/spectral_angle_mapper.py
@torch.no_grad()
def forward(
    self,
    cube: torch.Tensor,
    spectral_signature: torch.Tensor,
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Run spectral-angle scoring for all references."""
    ref = spectral_signature.squeeze(1).squeeze(1)  # [N, C]
    channel_count = int(ref.shape[-1])
    ref_mean = ref.mean(dim=-1, keepdim=True)
    ref_norm = ref / (ref_mean + self.eps)

    pixel_mean = cube.mean(dim=-1, keepdim=True)
    cube_norm = cube / (pixel_mean + self.eps)

    ref_expanded = ref_norm.view(1, 1, 1, ref_norm.shape[0], channel_count)
    cube_expanded = cube_norm.unsqueeze(-2)

    dot = (cube_expanded * ref_expanded).sum(dim=-1)
    norms = cube_norm.norm(dim=-1, keepdim=True) * ref_norm.norm(dim=-1).view(1, 1, 1, -1)
    cos_sim = dot / (norms + self.eps)
    scores = torch.acos(cos_sim.clamp(-1.0, 1.0))

    best_scores = scores.amin(dim=-1, keepdim=True)
    identity_mask = scores.argmin(dim=-1).to(torch.int32) + 1

    return {
        "scores": scores,
        "best_scores": best_scores,
        "identity_mask": identity_mask,
    }

Spectral Extraction

spectral_extractor

Spectral signature extraction nodes for hyperspectral cubes.

BBoxSpectralExtractor

BBoxSpectralExtractor(
    center_crop_scale=0.65,
    min_crop_pixels=4,
    trim_fraction=0.1,
    l2_normalize=True,
    aggregation="median",
    **kwargs,
)

Bases: Node

Extract per-bbox spectral signatures with trimmed median/mean and std.

Given an HSI cube [B, H, W, C] and detection bboxes [B, N, 4] (xyxy format), extracts a center-cropped spectral signature for each bbox. Outputs the per-band aggregated signature, per-band std, and a binary validity mask.

Source code in cuvis_ai/node/spectral_extractor.py
def __init__(
    self,
    center_crop_scale: float = 0.65,
    min_crop_pixels: int = 4,
    trim_fraction: float = 0.10,
    l2_normalize: bool = True,
    aggregation: str = "median",
    **kwargs: Any,
) -> None:
    if not (0.0 < center_crop_scale <= 1.0):
        raise ValueError("center_crop_scale must be in (0.0, 1.0].")
    if min_crop_pixels < 1:
        raise ValueError("min_crop_pixels must be >= 1.")
    if not (0.0 <= trim_fraction < 0.5):
        raise ValueError("trim_fraction must be in [0.0, 0.5).")
    if aggregation not in ("median", "mean"):
        raise ValueError("aggregation must be 'median' or 'mean'.")

    self.center_crop_scale = float(center_crop_scale)
    self.min_crop_pixels = int(min_crop_pixels)
    self.trim_fraction = float(trim_fraction)
    self.l2_normalize = bool(l2_normalize)
    self.aggregation = str(aggregation)

    super().__init__(
        center_crop_scale=center_crop_scale,
        min_crop_pixels=min_crop_pixels,
        trim_fraction=trim_fraction,
        l2_normalize=l2_normalize,
        aggregation=aggregation,
        **kwargs,
    )
forward
forward(cube, bboxes, context=None, **_)

Extract per-bbox spectral signatures for the first batch element.

Source code in cuvis_ai/node/spectral_extractor.py
@torch.no_grad()
def forward(
    self,
    cube: torch.Tensor,
    bboxes: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Extract per-bbox spectral signatures for the first batch element."""
    if cube.ndim != 4:
        raise ValueError(f"cube must have shape [B, H, W, C], got {tuple(cube.shape)}.")
    if cube.shape[0] < 1:
        raise ValueError("cube must have B >= 1.")
    if bboxes.ndim != 3 or bboxes.shape[2] != 4:
        raise ValueError(f"bboxes must have shape [B, N, 4], got {tuple(bboxes.shape)}.")

    cube_0 = cube[0]  # [H, W, C]
    img_h, img_w, num_channels = (
        int(cube_0.shape[0]),
        int(cube_0.shape[1]),
        int(cube_0.shape[2]),
    )

    num_boxes = int(bboxes.shape[1])

    # Empty detections
    if num_boxes == 0:
        empty_sig = torch.empty((1, 0, num_channels), dtype=torch.float32, device=cube.device)
        empty_valid = torch.empty((1, 0), dtype=torch.int32, device=cube.device)
        return {
            "spectral_signatures": empty_sig,
            "spectral_std": empty_sig.clone(),
            "spectral_valid": empty_valid,
        }

    signatures: list[torch.Tensor] = []
    stds: list[torch.Tensor] = []
    valids: list[int] = []

    for i in range(num_boxes):
        bx1, by1, bx2, by2 = [int(v) for v in bboxes[0, i].round().tolist()]

        cx1, cy1, cx2, cy2 = self._center_crop_bbox(bx1, by1, bx2, by2, img_h, img_w)

        cw = cx2 - cx1
        ch = cy2 - cy1
        if cw <= 0 or ch <= 0:
            # Bbox fully outside image
            zeros = torch.zeros(num_channels, dtype=cube_0.dtype, device=cube_0.device)
            signatures.append(zeros)
            stds.append(zeros.clone())
            valids.append(0)
            continue

        # Gather pixels from crop region: [P, C]
        pixels = cube_0[cy1:cy2, cx1:cx2, :].reshape(-1, num_channels)

        sig, std = self._trimmed_stats(pixels, num_channels)

        is_valid = sig.norm() >= 1e-8
        if is_valid and self.l2_normalize:
            sig_norm = sig.norm()
            if sig_norm >= 1e-8:
                sig = sig / sig_norm

        signatures.append(sig)
        stds.append(std)
        valids.append(1 if is_valid else 0)

    signatures_t = torch.stack(signatures, dim=0).unsqueeze(0)  # [1, N, C]
    stds_t = torch.stack(stds, dim=0).unsqueeze(0)  # [1, N, C]
    valids_t = torch.tensor(valids, dtype=torch.int32, device=cube.device).unsqueeze(
        0
    )  # [1, N]

    return {
        "spectral_signatures": signatures_t.to(torch.float32),
        "spectral_std": stds_t.to(torch.float32),
        "spectral_valid": valids_t,
    }

SpectralSignatureExtractor

SpectralSignatureExtractor(
    trim_fraction=0.1,
    min_mask_pixels=10,
    zero_norm_threshold=1e-08,
    **kwargs,
)

Bases: Node

Extract per-object spectral signatures from SAM-style label masks.

Source code in cuvis_ai/node/spectral_extractor.py
def __init__(
    self,
    trim_fraction: float = 0.1,
    min_mask_pixels: int = 10,
    zero_norm_threshold: float = 1e-8,
    **kwargs: Any,
) -> None:
    if not (0.0 <= trim_fraction < 0.5):
        raise ValueError("trim_fraction must be in [0.0, 0.5).")
    if min_mask_pixels < 1:
        raise ValueError("min_mask_pixels must be >= 1.")
    if zero_norm_threshold < 0.0:
        raise ValueError("zero_norm_threshold must be non-negative.")

    self.trim_fraction = float(trim_fraction)
    self.min_mask_pixels = int(min_mask_pixels)
    self.zero_norm_threshold = float(zero_norm_threshold)

    super().__init__(
        trim_fraction=trim_fraction,
        min_mask_pixels=min_mask_pixels,
        zero_norm_threshold=zero_norm_threshold,
        **kwargs,
    )
forward
forward(
    cube,
    mask,
    object_ids=None,
    wavelengths=None,
    context=None,
    **_,
)

Extract per-object signatures for the first batch element.

Source code in cuvis_ai/node/spectral_extractor.py
def forward(
    self,
    cube: torch.Tensor,
    mask: torch.Tensor,
    object_ids: torch.Tensor | None = None,
    wavelengths: np.ndarray | torch.Tensor | None = None,  # noqa: ARG002
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Extract per-object signatures for the first batch element."""
    if cube.ndim != 4:
        raise ValueError(f"cube must have shape [B, H, W, C], got {tuple(cube.shape)}.")
    if cube.shape[0] < 1:
        raise ValueError("cube must have B >= 1.")

    cube_0 = cube[0]
    height, width, num_channels = (
        int(cube_0.shape[0]),
        int(cube_0.shape[1]),
        int(cube_0.shape[2]),
    )

    mask_2d = self._parse_mask(mask).to(device=cube_0.device)
    mask_2d = self._resize_mask_if_needed(mask_2d, height=height, width=width)

    parsed_ids = self._parse_object_ids(object_ids)
    if parsed_ids is None:
        resolved_ids = torch.unique(mask_2d[mask_2d != 0], sorted=True).to(torch.int64)
    else:
        resolved_ids = parsed_ids.to(device=cube_0.device, dtype=torch.int64)

    if resolved_ids.numel() == 0:
        empty = torch.empty((1, 0, num_channels), dtype=cube_0.dtype, device=cube_0.device)
        return {"signatures": empty, "signatures_std": empty.clone()}

    signatures: list[torch.Tensor] = []
    signatures_std: list[torch.Tensor] = []
    for obj_id in resolved_ids.tolist():
        obj_mask = mask_2d == int(obj_id)
        if not bool(obj_mask.any()):
            zeros = torch.zeros(num_channels, dtype=cube_0.dtype, device=cube_0.device)
            signatures.append(zeros)
            signatures_std.append(zeros.clone())
            continue

        pixels = cube_0[obj_mask]
        mean, std = self._trimmed_stats(pixels, num_channels=num_channels)
        signatures.append(mean)
        signatures_std.append(std)

    signatures_t = torch.stack(signatures, dim=0).unsqueeze(0)
    signatures_std_t = torch.stack(signatures_std, dim=0).unsqueeze(0)
    return {
        "signatures": signatures_t.to(torch.float32),
        "signatures_std": signatures_std_t.to(torch.float32),
    }