Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Utility Nodes

Utility nodes cover binary decisions, label conversion, prompt injection, and small transforms that bind larger workflows together.

Deciders

binary_decider

Binary decision nodes for thresholding anomaly scores and logits.

This module provides threshold-based decision nodes that convert continuous anomaly scores or logits into binary decisions (anomaly/normal). Two strategies are available:

  • BinaryDecider: Fixed threshold applied globally to sigmoid-transformed logits
  • QuantileBinaryDecider: Adaptive per-batch thresholding using quantile statistics

Decision nodes are typically placed at the end of anomaly detection pipelines to convert detector outputs into actionable binary masks for visualization or evaluation.

BinaryDecider

BinaryDecider(threshold=0.5, **kwargs)

Bases: BinaryDecider

Simple decider node using a static threshold to classify data.

Accepts logits as input, applies sigmoid transformation to convert to probabilities [0, 1], then applies threshold to produce binary decisions.

Parameters:

Name Type Description Default
threshold float

The threshold to use for classification after sigmoid. Values >= threshold are classified as anomalies (True). Default: 0.5

0.5

Examples:

>>> from cuvis_ai.deciders.binary_decider import BinaryDecider
>>> import torch
>>>
>>> # Create decider with default threshold
>>> decider = BinaryDecider(threshold=0.5)
>>>
>>> # Apply to RX anomaly logits
>>> logits = torch.randn(4, 256, 256, 1)  # [B, H, W, C]
>>> output = decider.forward(logits=logits)
>>> decisions = output["decisions"]  # [4, 256, 256, 1] boolean mask
>>>
>>> # Use in pipeline
>>> pipeline.connect(
...     (logit_head.logits, decider.logits),
...     (decider.decisions, visualizer.mask),
... )
See Also

QuantileBinaryDecider : Adaptive per-batch thresholding ScoreToLogit : Convert scores to logits before decisioning

Source code in cuvis_ai/deciders/binary_decider.py
def __init__(self, threshold: float = 0.5, **kwargs) -> None:
    self.threshold = threshold
    # Forward threshold to BaseDecider so Serializable captures it in hparams
    super().__init__(threshold=threshold, **kwargs)
forward
forward(logits, **_)

Apply sigmoid and threshold-based decisioning on channels-last data.

Args: logits: Tensor shaped (B, H, W, C) containing logits.

Returns: Dictionary with "decisions" key containing (B, H, W, 1) decision mask.

Source code in cuvis_ai/deciders/binary_decider.py
def forward(
    self,
    logits: Tensor,
    **_: Any,
) -> dict[str, Tensor]:
    """Apply sigmoid and threshold-based decisioning on channels-last data.

    Args:
        logits: Tensor shaped (B, H, W, C) containing logits.

    Returns:
        Dictionary with "decisions" key containing (B, H, W, 1) decision mask.
    """

    # Apply sigmoid if needed to convert logits to probabilities
    tensor = torch.sigmoid(logits)

    # Apply threshold to get binary decisions
    decisions = tensor >= self.threshold
    return {"decisions": decisions}

QuantileBinaryDecider

QuantileBinaryDecider(
    quantile=0.995, reduce_dims=None, **kwargs
)

Bases: BinaryDecider

Quantile-based thresholding node operating on BHWC logits or scores.

This decider computes a tensor-valued threshold per batch item using the requested quantile over one or more non-batch dimensions, then produces a binary mask where values greater than or equal to that threshold are marked as anomalies. Useful for adaptive thresholding when score distributions vary across batches.

Parameters:

Name Type Description Default
quantile float

Quantile in the closed interval [0, 1] used for the threshold computation (default: 0.995). Higher values (e.g., 0.99, 0.995) are typical for anomaly detection to capture rare events.

0.995
reduce_dims Sequence[int] | None

Axes (relative to the input tensor) over which to compute the quantile. When None (default), all non-batch dimensions (H, W, C) are reduced. For per-channel thresholds, use reduce_dims=[1, 2] (reduce H, W only).

None

Examples:

>>> from cuvis_ai.deciders.binary_decider import QuantileBinaryDecider
>>> import torch
>>>
>>> # Create quantile-based decider (99.5th percentile)
>>> decider = QuantileBinaryDecider(quantile=0.995)
>>>
>>> # Apply to anomaly scores
>>> scores = torch.randn(4, 256, 256, 1)  # [B, H, W, C]
>>> output = decider.forward(logits=scores)
>>> decisions = output["decisions"]  # [4, 256, 256, 1] boolean mask
>>>
>>> # Per-channel thresholding (reduce H, W only)
>>> decider_perchannel = QuantileBinaryDecider(
...     quantile=0.99,
...     reduce_dims=[1, 2],  # Compute threshold per channel
... )
See Also

BinaryDecider : Fixed threshold decisioning

Source code in cuvis_ai/deciders/binary_decider.py
def __init__(
    self,
    quantile: float = 0.995,
    reduce_dims: Sequence[int] | None = None,
    **kwargs,
) -> None:
    self._validate_quantile(quantile)
    self.quantile = float(quantile)
    self.reduce_dims = (
        tuple(int(dim) for dim in reduce_dims) if reduce_dims is not None else None
    )
    # Forward init params so Serializable records them for config serialization
    super().__init__(quantile=self.quantile, reduce_dims=self.reduce_dims, **kwargs)
forward
forward(logits, **_)

Apply quantile-based thresholding to produce binary decisions.

Computes per-batch thresholds using the specified quantile over reduce_dims, then classifies values >= threshold as anomalies.

Parameters:

Name Type Description Default
logits Tensor

Input logits or anomaly scores, shape (B, H, W, C)

required

Returns:

Type Description
dict[str, Tensor]

Dictionary containing:

  • "decisions" : Tensor Binary decision mask, shape (B, H, W, 1)
Source code in cuvis_ai/deciders/binary_decider.py
def forward(self, logits: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply quantile-based thresholding to produce binary decisions.

    Computes per-batch thresholds using the specified quantile over reduce_dims,
    then classifies values >= threshold as anomalies.

    Parameters
    ----------
    logits : Tensor
        Input logits or anomaly scores, shape (B, H, W, C)

    Returns
    -------
    dict[str, Tensor]
        Dictionary containing:

        - "decisions" : Tensor
            Binary decision mask, shape (B, H, W, 1)
    """
    tensor = logits
    dims = resolve_reduce_dims(self.reduce_dims, tensor.dim())

    if len(dims) == 1:
        threshold = torch.quantile(
            tensor,
            self.quantile,
            dim=dims[0],
            keepdim=True,
        )
    else:
        tensor_ndim = tensor.dim()
        dims_to_keep = tuple(i for i in range(tensor_ndim) if i not in dims)
        new_order = (*dims_to_keep, *dims)
        permuted = tensor.permute(new_order)
        sizes_keep = [permuted.size(i) for i in range(len(dims_to_keep))]
        flattened = permuted.reshape(*sizes_keep, -1)
        threshold_flat = torch.quantile(
            flattened,
            self.quantile,
            dim=len(dims_to_keep),
            keepdim=True,
        )
        threshold_permuted = threshold_flat.reshape(
            *sizes_keep,
            *([1] * len(dims)),
        )
        inverse_order = [0] * tensor_ndim
        for original_idx, permuted_idx in enumerate(new_order):
            inverse_order[permuted_idx] = original_idx
        threshold = threshold_permuted.permute(*inverse_order)

    decisions = (tensor >= threshold).to(torch.bool)
    return {"decisions": decisions}

resolve_reduce_dims

resolve_reduce_dims(reduce_dims, tensor_ndim)

Resolve reduction dimensions, handling negative indices.

Parameters:

Name Type Description Default
reduce_dims tuple[int, ...] | None

Dimension indices to reduce over (may contain negatives). When None, returns all non-batch dimensions (1, ..., ndim-1).

required
tensor_ndim int

Number of dimensions in the tensor.

required

Returns:

Type Description
tuple[int, ...]

Sorted, deduplicated positive dimension indices.

Source code in cuvis_ai/deciders/binary_decider.py
def resolve_reduce_dims(reduce_dims: tuple[int, ...] | None, tensor_ndim: int) -> tuple[int, ...]:
    """Resolve reduction dimensions, handling negative indices.

    Parameters
    ----------
    reduce_dims : tuple[int, ...] | None
        Dimension indices to reduce over (may contain negatives).
        When ``None``, returns all non-batch dimensions ``(1, ..., ndim-1)``.
    tensor_ndim : int
        Number of dimensions in the tensor.

    Returns
    -------
    tuple[int, ...]
        Sorted, deduplicated positive dimension indices.
    """
    if reduce_dims is None:
        return tuple(range(1, tensor_ndim))

    resolved: list[int] = []
    for dim in reduce_dims:
        adjusted = dim if dim >= 0 else tensor_ndim + dim
        resolved.append(adjusted)
    return tuple(sorted(set(resolved)))

two_stage_decider

Two-Stage Binary Decision Module.

This module provides a two-stage binary decision node that first applies an image-level anomaly gate based on top-k statistics, then applies pixel-level quantile thresholding only for images that pass the gate.

This approach reduces false positives by filtering out images with low overall anomaly scores before applying pixel-level decisions.

See Also

cuvis_ai.deciders.binary_decider : Simple threshold-based binary decisions

TwoStageBinaryDecider

TwoStageBinaryDecider(
    image_threshold=0.5,
    top_k_fraction=0.001,
    quantile=0.995,
    reduce_dims=None,
    **kwargs,
)

Bases: BinaryDecider

Two-stage binary decider: image-level gate + pixel quantile mask.

Source code in cuvis_ai/deciders/two_stage_decider.py
def __init__(
    self,
    image_threshold: float = 0.5,
    top_k_fraction: float = 0.001,
    quantile: float = 0.995,
    reduce_dims: Sequence[int] | None = None,
    **kwargs,
) -> None:
    if not 0.0 <= image_threshold <= 1.0:
        raise ValueError("image_threshold must be within [0, 1]")
    if not 0.0 < top_k_fraction <= 1.0:
        raise ValueError("top_k_fraction must be in (0, 1]")
    if not 0.0 <= quantile <= 1.0:
        raise ValueError("quantile must be within [0, 1]")

    self.image_threshold = float(image_threshold)
    self.top_k_fraction = float(top_k_fraction)
    self.quantile = float(quantile)
    self.reduce_dims = (
        tuple(int(dim) for dim in reduce_dims) if reduce_dims is not None else None
    )
    super().__init__(
        image_threshold=self.image_threshold,
        top_k_fraction=self.top_k_fraction,
        quantile=self.quantile,
        reduce_dims=self.reduce_dims,
        **kwargs,
    )
forward
forward(logits, **_)

Apply two-stage binary decision: image-level gate + pixel quantile.

Stage 1: Compute image-level anomaly score from top-k pixel scores. If below threshold, return blank mask (no anomalies).

Stage 2: For images passing the gate, apply pixel-level quantile thresholding to create binary anomaly mask.

Parameters:

Name Type Description Default
logits Tensor

Anomaly scores [B, H, W, C] or [B, H, W, 1].

required
**_ Any

Additional unused keyword arguments.

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "decisions" key containing binary masks [B, H, W, 1].

Notes

The image-level score is computed as the mean of the top-k% highest pixel scores. For multi-channel inputs, the max across channels is used for each pixel.

Source code in cuvis_ai/deciders/two_stage_decider.py
def forward(self, logits: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply two-stage binary decision: image-level gate + pixel quantile.

    Stage 1: Compute image-level anomaly score from top-k pixel scores.
    If below threshold, return blank mask (no anomalies).

    Stage 2: For images passing the gate, apply pixel-level quantile
    thresholding to create binary anomaly mask.

    Parameters
    ----------
    logits : Tensor
        Anomaly scores [B, H, W, C] or [B, H, W, 1].
    **_ : Any
        Additional unused keyword arguments.

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "decisions" key containing binary masks [B, H, W, 1].

    Notes
    -----
    The image-level score is computed as the mean of the top-k% highest
    pixel scores. For multi-channel inputs, the max across channels is
    used for each pixel.
    """
    tensor = logits
    bsz = tensor.shape[0]

    # DEBUG: Log input tensor stats
    logger.debug(
        f"TwoStageDecider input: shape={tensor.shape}, device={tensor.device}, "
        f"dtype={tensor.dtype}, min={tensor.min().item():.6f}, "
        f"max={tensor.max().item():.6f}, mean={tensor.mean().item():.6f}"
    )

    decisions = []
    for b in range(bsz):
        scores = tensor[b]  # [H, W, C]
        # Reduce to per-pixel max for image score
        if scores.dim() == 3:
            pixel_scores = scores.max(dim=-1)[0]
        else:
            pixel_scores = scores
        flat = pixel_scores.reshape(-1)
        k = max(
            1,
            int(
                torch.ceil(
                    torch.tensor(flat.numel() * self.top_k_fraction, dtype=torch.float32)
                ).item()
            ),
        )
        topk_vals, _ = torch.topk(flat, k)
        image_score = topk_vals.mean().item()  # Convert to Python float for comparison

        # DEBUG: Log intermediate computation values
        logger.debug(
            f"TwoStageDecider[batch={b}]: k={k}, topk_min={topk_vals.min().item():.6f}, "
            f"topk_max={topk_vals.max().item():.6f}, image_score={image_score:.6f}"
        )

        # Stage 1: Image-level gate
        if image_score < self.image_threshold:
            # Gate failed: return blank mask
            logger.debug(
                f"TwoStageDecider: image_score={image_score:.6f} < threshold={self.image_threshold:.6f}, "
                f"returning blank mask"
            )
            decisions.append(
                torch.zeros((*pixel_scores.shape, 1), dtype=torch.bool, device=tensor.device)
            )
            continue

        # Stage 2: Gate passed, apply pixel-level quantile thresholding
        logger.debug(
            f"TwoStageDecider: image_score={image_score:.6f} >= threshold={self.image_threshold:.6f}, "
            f"applying quantile thresholding (q={self.quantile})"
        )
        # Compute quantile threshold: reduce over all dimensions to get scalar per batch item
        # This matches QuantileBinaryDecider behavior: for [B, H, W, C] it reduces over (H, W, C)
        # For single batch item [H, W, C], we reduce over all dims (0, 1, 2)
        threshold = torch.quantile(scores, self.quantile)

        # Apply threshold: for multi-channel scores, take max across channels first
        if scores.dim() == 3:  # [H, W, C]
            # Take max across channels to get per-pixel score, then threshold
            pixel_scores = scores.max(dim=-1, keepdim=False)[0]  # [H, W]
            binary_map = (pixel_scores >= threshold).unsqueeze(-1).to(torch.bool)  # [H, W, 1]
        else:  # [H, W] - single channel
            binary_map = (scores >= threshold).unsqueeze(-1).to(torch.bool)  # [H, W, 1]

        decisions.append(binary_map)

    return {"decisions": torch.stack(decisions, dim=0)}

Conversion

conversion

Conversion nodes for anomaly and segmentation pipelines.

This module provides:

  • ScoreToLogit: affine conversion from anomaly scores to logits
  • DecisionToMask: combine binary decisions with identity IDs into masks

ScoreToLogit

ScoreToLogit(init_scale=1.0, init_bias=0.0, **kwargs)

Bases: Node

Trainable head that converts RX scores to anomaly logits.

This node takes RX anomaly scores (typically Mahalanobis distances) and applies a learned affine transformation to produce logits suitable for binary classification with BCEWithLogitsLoss.

The transformation is: logit = scale * (score - bias)

Parameters:

Name Type Description Default
init_scale float

Initial value for the scale parameter

1.0
init_bias float

Initial value for the bias parameter (threshold)

0.0

Attributes:

Name Type Description
scale Parameter or Tensor

Scale factor applied to scores

bias Parameter or Tensor

Bias (threshold) subtracted from scores before scaling

Examples:

>>> # After RX detector
>>> rx = RXGlobal(eps=1e-6)
>>> logit_head = ScoreToLogit(init_scale=1.0, init_bias=5.0)
>>> logit_head.unfreeze()  # Enable gradient training
>>> graph.connect(rx.scores, logit_head.scores)
Source code in cuvis_ai/node/conversion.py
def __init__(
    self,
    init_scale: float = 1.0,
    init_bias: float = 0.0,
    **kwargs,
) -> None:
    self.init_scale = init_scale
    self.init_bias = init_bias

    super().__init__(
        init_scale=init_scale,
        init_bias=init_bias,
        **kwargs,
    )

    # Initialize as buffers (frozen by default)
    self.register_buffer("scale", torch.tensor(init_scale, dtype=torch.float32))
    self.register_buffer("bias", torch.tensor(init_bias, dtype=torch.float32))

    self._welford = WelfordAccumulator(1)
    # Allow using the head with the provided init_scale/init_bias without forcing a fit()
    self._statistically_initialized = True
statistical_initialization
statistical_initialization(input_stream)

Initialize bias from statistics of RX scores using streaming approach.

Uses Welford's algorithm for numerically stable online computation of mean and standard deviation, similar to RXGlobal.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"scores": tensor} where tensor is the RX scores

required
Source code in cuvis_ai/node/conversion.py
def statistical_initialization(self, input_stream) -> None:
    """Initialize bias from statistics of RX scores using streaming approach.

    Uses Welford's algorithm for numerically stable online computation of
    mean and standard deviation, similar to RXGlobal.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"scores": tensor} where tensor is the RX scores
    """
    self.reset()
    for batch_data in input_stream:
        # Extract scores from port-based dict
        scores = batch_data.get("scores")
        if scores is not None:
            self.update(scores)

    if self._welford.count <= 1:
        self._statistically_initialized = False
        raise RuntimeError(
            "ScoreToLogit.statistical_initialization() received insufficient samples. "
            "Expected at least 2 score values."
        )
    self.finalize()
update
update(scores)

Update running statistics with a batch of scores.

Parameters:

Name Type Description Default
scores Tensor

Batch of RX scores in BHWC format

required
Source code in cuvis_ai/node/conversion.py
@torch.no_grad()
def update(self, scores: torch.Tensor) -> None:
    """Update running statistics with a batch of scores.

    Parameters
    ----------
    scores : torch.Tensor
        Batch of RX scores in BHWC format
    """
    X = scores.flatten()
    if X.shape[0] <= 1:
        return
    self._welford.update(X)
    self._statistically_initialized = False
finalize
finalize()

Finalize statistics and set bias to mean + 2*std.

This threshold (mean + 2*std) is a common heuristic for anomaly detection, capturing ~95% of normal data under Gaussian assumption.

Source code in cuvis_ai/node/conversion.py
@torch.no_grad()
def finalize(self) -> None:
    """Finalize statistics and set bias to mean + 2*std.

    This threshold (mean + 2*std) is a common heuristic for anomaly detection,
    capturing ~95% of normal data under Gaussian assumption.
    """
    if self._welford.count <= 1:
        raise ValueError("Not enough samples to finalize ScoreToLogit statistics.")

    mean = self._welford.mean.squeeze()
    std = self._welford.std.squeeze()

    # Set bias to mean + 2*std (threshold for anomalies)
    self.bias = mean + 2.0 * std
    self._statistically_initialized = True
reset
reset()

Reset all statistics and accumulators.

Source code in cuvis_ai/node/conversion.py
def reset(self) -> None:
    """Reset all statistics and accumulators."""
    self._welford.reset()
    # Keep explicit init_scale/init_bias usable in inference-only runs.
    # Statistical flows still transition through update()/finalize().
    self._statistically_initialized = True
forward
forward(scores, **_)

Transform RX scores to logits.

Parameters:

Name Type Description Default
scores Tensor

Input RX scores with shape (B, H, W, 1)

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "logits" key containing transformed scores

Source code in cuvis_ai/node/conversion.py
def forward(self, scores: torch.Tensor, **_) -> dict[str, torch.Tensor]:
    """Transform RX scores to logits.

    Parameters
    ----------
    scores : torch.Tensor
        Input RX scores with shape (B, H, W, 1)

    Returns
    -------
    dict[str, torch.Tensor]
        Dictionary with "logits" key containing transformed scores
    """

    if not self._statistically_initialized:
        raise RuntimeError(
            "ScoreToLogit not initialized. Call statistical_initialization() before forward()."
        )
    # Apply affine transformation: logit = scale * (score - bias)
    logits = self.scale * (scores - self.bias)

    return {"logits": logits}
get_threshold
get_threshold()

Get the current anomaly threshold (bias value).

Returns:

Type Description
float

Current threshold value

Source code in cuvis_ai/node/conversion.py
def get_threshold(self) -> float:
    """Get the current anomaly threshold (bias value).

    Returns
    -------
    float
        Current threshold value
    """
    return self.bias.item()
set_threshold
set_threshold(threshold)

Set the anomaly threshold (bias value).

Parameters:

Name Type Description Default
threshold float

New threshold value

required
Source code in cuvis_ai/node/conversion.py
def set_threshold(self, threshold: float) -> None:
    """Set the anomaly threshold (bias value).

    Parameters
    ----------
    threshold : float
        New threshold value
    """
    with torch.no_grad():
        self.bias.fill_(threshold)
predict_anomalies
predict_anomalies(logits)

Convert logits to binary anomaly predictions.

Parameters:

Name Type Description Default
logits Tensor

Logits from forward pass, shape (B, H, W, 1)

required

Returns:

Type Description
Tensor

Binary predictions (0=normal, 1=anomaly), shape (B, H, W, 1)

Source code in cuvis_ai/node/conversion.py
def predict_anomalies(self, logits: torch.Tensor) -> torch.Tensor:
    """Convert logits to binary anomaly predictions.

    Parameters
    ----------
    logits : torch.Tensor
        Logits from forward pass, shape (B, H, W, 1)

    Returns
    -------
    torch.Tensor
        Binary predictions (0=normal, 1=anomaly), shape (B, H, W, 1)
    """
    return (logits > 0).float()

DecisionToMask

Bases: Node

Combine binary decisions and identity labels into a single int32 mask.

The output mask keeps per-pixel identity IDs where the decision is True and sets all non-matching pixels to 0.

forward
forward(decisions, identity_mask, **_)

Apply decisions to identities and return the final segmentation mask.

Source code in cuvis_ai/node/conversion.py
@torch.no_grad()
def forward(
    self,
    decisions: torch.Tensor,
    identity_mask: torch.Tensor,
    **_,
) -> dict[str, torch.Tensor]:
    """Apply decisions to identities and return the final segmentation mask."""
    mask = identity_mask.to(torch.int32) * decisions.squeeze(-1).to(torch.int32)
    return {"mask": mask}

Labels

labels

Label Mapping Nodes.

This module provides nodes for converting multi-class segmentation masks to binary anomaly labels. These nodes are useful when training with datasets that have multi-class annotations but the task requires binary anomaly detection.

The main node remaps class IDs to binary labels (0=normal, 1=anomaly) based on configurable normal and anomaly class ID lists.

See Also

cuvis_ai.deciders : Binary decision nodes for threshold-based classification

BinaryAnomalyLabelMapper

BinaryAnomalyLabelMapper(
    normal_class_ids, anomaly_class_ids=None, **kwargs
)

Bases: Node

Convert multi-class segmentation masks to binary anomaly targets.

Masks are remapped to torch.long tensors with 0 representing normal pixels and 1 indicating anomalies.

Parameters:

Name Type Description Default
normal_class_ids Iterable[int]

Class IDs that should be considered normal (default: (0, 2)).

required
anomaly_class_ids Iterable[int] | None

Explicit anomaly IDs. When None all IDs not in normal_class_ids are treated as anomalies. When provided, only these IDs are treated as anomalies and all others (including those not in normal_class_ids) are treated as normal.

None
Source code in cuvis_ai/node/labels.py
def __init__(
    self,
    normal_class_ids: Iterable[int],
    anomaly_class_ids: Iterable[int] | None = None,
    **kwargs,
) -> None:
    self.normal_class_ids = tuple(int(c) for c in normal_class_ids)
    self.anomaly_class_ids = (
        tuple(int(c) for c in anomaly_class_ids) if anomaly_class_ids is not None else None
    )

    # Validate that there are no overlaps between normal and anomaly class IDs
    if self.anomaly_class_ids is not None:
        overlap = set(self.normal_class_ids) & set(self.anomaly_class_ids)
        if overlap:
            raise ValueError(
                f"Overlap detected between normal_class_ids and anomaly_class_ids: {overlap}. "
                "Class IDs cannot be both normal and anomaly."
            )

        # Check for gaps in coverage and issue warning
        all_specified_ids = set(self.normal_class_ids) | set(self.anomaly_class_ids)
        max_id = max(all_specified_ids) if all_specified_ids else 0

        # Find gaps (missing class IDs)
        expected_ids = set(range(max_id + 1))
        gaps = expected_ids - all_specified_ids

        if gaps:
            warnings.warn(
                f"Gap detected in class ID coverage. The following class IDs are not specified "
                f"in either normal_class_ids or anomaly_class_ids: {gaps}. "
                f"These will be treated as normal classes. To specify all classes explicitly, "
                f"include them in normal_class_ids or anomaly_class_ids.",
                UserWarning,
                stacklevel=2,
            )
            # Add gaps to normal_class_ids as requested
            self.normal_class_ids = tuple(sorted(set(self.normal_class_ids) | gaps))

    self._target_dtype = torch.long

    super().__init__(
        normal_class_ids=self.normal_class_ids,
        anomaly_class_ids=self.anomaly_class_ids,
        **kwargs,
    )
forward
forward(cube, mask, **_)

Map multi-class labels to binary anomaly labels.

Parameters:

Name Type Description Default
cube Tensor

Features/scores to pass through [B, H, W, C]

required
mask Tensor

Multi-class segmentation masks [B, H, W, 1]

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "cube" (pass-through) and "mask" (binary bool) keys

Source code in cuvis_ai/node/labels.py
def forward(self, cube: Tensor, mask: Tensor, **_: Any) -> dict[str, Tensor]:
    """Map multi-class labels to binary anomaly labels.

    Parameters
    ----------
    cube : Tensor
        Features/scores to pass through [B, H, W, C]
    mask : Tensor
        Multi-class segmentation masks [B, H, W, 1]

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "cube" (pass-through) and "mask" (binary bool) keys
    """
    if self.anomaly_class_ids is not None:
        # Explicit anomaly class IDs: only these are anomalies, rest are normal
        mask_anomaly = self._membership_mask(mask, self.anomaly_class_ids)
    else:
        # Original behavior: normal_class_ids are normal, everything else is anomaly
        mask_normal = self._membership_mask(mask, self.normal_class_ids)
        mask_anomaly = ~mask_normal

    mapped = torch.zeros_like(mask, dtype=self._target_dtype, device=mask.device)
    mapped = torch.where(mask_anomaly, torch.ones_like(mapped), mapped)

    # Convert to bool for smaller tensor size
    mapped = mapped.bool()

    return {"cube": cube, "mask": mapped}

Prompt Nodes

prompts

Static nodes and helpers for frame-indexed text, mask, and bbox prompt schedules.

SpatialPromptSpec dataclass

SpatialPromptSpec(object_id, detection_id, frame_id, order)

One scheduled spatial (mask or bbox) prompt entry.

TextPromptSpec dataclass

TextPromptSpec(text, frame_id, order)

One scheduled text-prompt entry.

MaskPrompt

MaskPrompt(json_path, prompt_specs=None, **kwargs)

Bases: Node

Emit a scheduled label-map prompt mask for the requested frame.

Source code in cuvis_ai/node/prompts.py
def __init__(
    self,
    json_path: str,
    prompt_specs: Sequence[str] | None = None,
    **kwargs: Any,
) -> None:
    self.json_path = Path(json_path)
    self._prompt_specs = [str(spec) for spec in (prompt_specs or [])]
    self._masks_by_frame, self._frame_hw_by_id, self._default_hw = load_mask_prompt_schedule(
        self.json_path,
        self._prompt_specs,
    )
    super().__init__(json_path=str(self.json_path), prompt_specs=self._prompt_specs, **kwargs)
forward
forward(frame_id, context=None, **_)

Emit the scheduled prompt label map for frame_id or an empty mask.

Source code in cuvis_ai/node/prompts.py
def forward(
    self,
    frame_id: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Emit the scheduled prompt label map for ``frame_id`` or an empty mask."""
    if frame_id is None or frame_id.numel() == 0:
        raise ValueError("MaskPrompt requires a non-empty frame_id input.")

    current_frame_id = int(frame_id.reshape(-1)[0].item())
    frame_hw = _resolve_frame_hw(
        current_frame_id,
        self._frame_hw_by_id,
        self._default_hw,
        self.json_path,
        fallback_on_placeholder=True,
    )
    label_map = self._masks_by_frame.get(current_frame_id)

    if label_map is None:
        mask_t = torch.zeros((1, frame_hw[0], frame_hw[1]), dtype=torch.int32)
    else:
        mask_t = (
            torch.from_numpy(np.array(label_map, copy=True)).unsqueeze(0).to(dtype=torch.int32)
        )
    return {"mask": mask_t}

BBoxPrompt

BBoxPrompt(json_path, prompt_specs=None, **kwargs)

Bases: Node

Emit scheduled runtime bbox prompts plus overlay-friendly debug tensors.

Source code in cuvis_ai/node/prompts.py
def __init__(
    self,
    json_path: str,
    prompt_specs: Sequence[str] | None = None,
    **kwargs: Any,
) -> None:
    self.json_path = Path(json_path)
    self._prompt_specs = [str(spec) for spec in (prompt_specs or [])]
    self._prompts_by_frame, self._frame_hw_by_id, self._default_hw = load_bbox_prompt_schedule(
        self.json_path,
        self._prompt_specs,
    )
    super().__init__(json_path=str(self.json_path), prompt_specs=self._prompt_specs, **kwargs)
forward
forward(frame_id, context=None, **_)

Emit the scheduled bbox prompt list for frame_id or an empty list.

Source code in cuvis_ai/node/prompts.py
def forward(
    self,
    frame_id: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, torch.Tensor | list[dict[str, float | int]]]:
    """Emit the scheduled bbox prompt list for ``frame_id`` or an empty list."""
    if frame_id is None or frame_id.numel() == 0:
        raise ValueError("BBoxPrompt requires a non-empty frame_id input.")

    current_frame_id = int(frame_id.reshape(-1)[0].item())
    frame_hw = _resolve_frame_hw(
        current_frame_id,
        self._frame_hw_by_id,
        self._default_hw,
        self.json_path,
    )
    prompts = self._prompts_by_frame.get(current_frame_id, [])
    prompts_out = [dict(prompt) for prompt in prompts]

    if prompts_out:
        boxes_xyxy = torch.tensor(
            [
                [prompt["x_min"], prompt["y_min"], prompt["x_max"], prompt["y_max"]]
                for prompt in prompts_out
            ],
            dtype=torch.float32,
        ).unsqueeze(0)
        object_ids = torch.tensor(
            [int(prompt["object_id"]) for prompt in prompts_out],
            dtype=torch.int64,
        ).unsqueeze(0)
    else:
        boxes_xyxy = torch.zeros((1, 0, 4), dtype=torch.float32)
        object_ids = torch.zeros((1, 0), dtype=torch.int64)

    if frame_hw[0] <= 0 or frame_hw[1] <= 0:
        raise ValueError(
            f"Resolved invalid frame size for frame {current_frame_id}: {frame_hw}."
        )

    return {
        "bboxes": prompts_out,
        "prompt_boxes_xyxy": boxes_xyxy,
        "prompt_object_ids": object_ids,
    }

TextPrompt

TextPrompt(
    prompt_specs=None, prompt_mode="scheduled", **kwargs
)

Bases: Node

Emit a runtime text prompt for the requested frame.

Source code in cuvis_ai/node/prompts.py
def __init__(
    self,
    prompt_specs: Sequence[str] | None = None,
    prompt_mode: str = "scheduled",
    **kwargs: Any,
) -> None:
    self._prompt_specs = [str(spec) for spec in (prompt_specs or [])]
    self._prompts_by_frame = load_text_prompt_schedule(self._prompt_specs)
    self._prompt_mode = normalize_text_prompt_mode(prompt_mode)
    super().__init__(
        prompt_specs=self._prompt_specs,
        prompt_mode=self._prompt_mode,
        **kwargs,
    )
forward
forward(frame_id, context=None, **_)

Emit the resolved prompt text for frame_id or an empty string.

Source code in cuvis_ai/node/prompts.py
def forward(
    self,
    frame_id: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, str]:
    """Emit the resolved prompt text for ``frame_id`` or an empty string."""
    if frame_id is None or frame_id.numel() == 0:
        raise ValueError("TextPrompt requires a non-empty frame_id input.")

    current_frame_id = int(frame_id.reshape(-1)[0].item())
    return {
        "text_prompt": resolve_text_prompt_for_frame(
            self._prompts_by_frame,
            current_frame_id,
            prompt_mode=self._prompt_mode,
        )
    }

parse_spatial_prompt_spec

parse_spatial_prompt_spec(spec, order=0)

Parse <object_id>:<detection_id>@<frame_id> into a spatial prompt spec.

Source code in cuvis_ai/node/prompts.py
def parse_spatial_prompt_spec(spec: str, order: int = 0) -> SpatialPromptSpec:
    """Parse ``<object_id>:<detection_id>@<frame_id>`` into a spatial prompt spec."""
    match = _PROMPT_SPEC_RE.fullmatch(spec)
    if match is None:
        raise ValueError(
            f"Invalid prompt spec '{spec}'. Expected format <object_id>:<detection_id>@<frame_id>."
        )
    return SpatialPromptSpec(
        object_id=int(match.group(1)),
        detection_id=int(match.group(2)),
        frame_id=int(match.group(3)),
        order=int(order),
    )

parse_text_prompt_spec

parse_text_prompt_spec(spec, order=0)

Parse <text>@<frame_id> into a typed text spec.

Bare <text> is accepted as a backward-compatible alias for <text>@0.

Source code in cuvis_ai/node/prompts.py
def parse_text_prompt_spec(spec: str, order: int = 0) -> TextPromptSpec:
    """Parse ``<text>@<frame_id>`` into a typed text spec.

    Bare ``<text>`` is accepted as a backward-compatible alias for ``<text>@0``.
    """
    if not isinstance(spec, str):
        raise ValueError(f"Text prompt spec must be a string, got {type(spec).__name__}.")

    raw_spec = spec.strip()
    if not raw_spec:
        raise ValueError("Text prompt spec must be non-empty.")

    if "@" in raw_spec:
        prompt_text, frame_part = raw_spec.rsplit("@", maxsplit=1)
        prompt_text = prompt_text.strip()
        if not frame_part.strip().isdigit():
            raise ValueError(
                f"Invalid text prompt spec '{spec}'. Expected format <text>@<frame_id>."
            )
        frame_id = int(frame_part.strip())
    else:
        prompt_text = raw_spec
        frame_id = 0

    if not prompt_text:
        raise ValueError(f"Invalid text prompt spec '{spec}'. Prompt text must not be empty.")
    if frame_id < 0:
        raise ValueError(f"Invalid text prompt spec '{spec}'. frame_id must be zero or positive.")

    return TextPromptSpec(text=prompt_text, frame_id=frame_id, order=int(order))

load_text_prompt_schedule

load_text_prompt_schedule(prompt_specs)

Build a per-frame text prompt schedule.

Multiple prompt frames are allowed. V1 rejects multiple distinct texts on the same frame.

Source code in cuvis_ai/node/prompts.py
def load_text_prompt_schedule(prompt_specs: Sequence[str] | None) -> dict[int, str]:
    """Build a per-frame text prompt schedule.

    Multiple prompt frames are allowed. V1 rejects multiple distinct texts on the
    same frame.
    """
    prompts_by_frame: dict[int, str] = {}
    for order, raw_spec in enumerate(prompt_specs or []):
        spec = parse_text_prompt_spec(raw_spec, order=order)
        existing = prompts_by_frame.get(spec.frame_id)
        if existing is not None and existing != spec.text:
            raise ValueError(
                "Multiple distinct text prompts on the same frame are not supported: "
                f"frame {spec.frame_id} has both '{existing}' and '{spec.text}'."
            )
        prompts_by_frame[spec.frame_id] = spec.text
    return prompts_by_frame

normalize_text_prompt_mode

normalize_text_prompt_mode(prompt_mode)

Normalize and validate the text-prompt emission mode.

Source code in cuvis_ai/node/prompts.py
def normalize_text_prompt_mode(prompt_mode: str) -> str:
    """Normalize and validate the text-prompt emission mode."""
    normalized = str(prompt_mode).strip().lower()
    if normalized not in _TEXT_PROMPT_MODES:
        raise ValueError(
            f"Unsupported text prompt mode '{prompt_mode}'. "
            f"Expected one of {sorted(_TEXT_PROMPT_MODES)}."
        )
    return normalized

resolve_text_prompt_for_frame

resolve_text_prompt_for_frame(
    prompts_by_frame, frame_id, *, prompt_mode="scheduled"
)

Resolve the runtime text prompt for frame_id.

scheduled emits only on exact prompt frames. repeat keeps the latest scheduled prompt active until replaced.

Source code in cuvis_ai/node/prompts.py
def resolve_text_prompt_for_frame(
    prompts_by_frame: dict[int, str],
    frame_id: int,
    *,
    prompt_mode: str = "scheduled",
) -> str:
    """Resolve the runtime text prompt for ``frame_id``.

    ``scheduled`` emits only on exact prompt frames.
    ``repeat`` keeps the latest scheduled prompt active until replaced.
    """
    current_frame_id = int(frame_id)
    normalized_mode = normalize_text_prompt_mode(prompt_mode)
    if normalized_mode == "scheduled":
        return str(prompts_by_frame.get(current_frame_id, ""))

    latest_prompt = ""
    latest_frame_id: int | None = None
    for scheduled_frame_id, prompt_text in prompts_by_frame.items():
        scheduled_frame_id = int(scheduled_frame_id)
        if scheduled_frame_id > current_frame_id:
            continue
        if latest_frame_id is None or scheduled_frame_id > latest_frame_id:
            latest_frame_id = scheduled_frame_id
            latest_prompt = str(prompt_text)
    return latest_prompt

load_detection_index

load_detection_index(json_path)

Load a flat COCO or track-centric SAM3 detection JSON into frame-indexed metadata.

Source code in cuvis_ai/node/prompts.py
def load_detection_index(
    json_path: str | Path,
) -> tuple[
    dict[int, list[dict[str, Any]]],
    dict[int, tuple[int, int]],
    tuple[int, int] | None,
]:
    """Load a flat COCO or track-centric SAM3 detection JSON into frame-indexed metadata."""
    json_file = Path(json_path)
    if not json_file.exists():
        raise FileNotFoundError(f"Detection JSON not found: {json_file}")

    with json_file.open("r", encoding="utf-8") as handle:
        data = json.load(handle)

    return _build_frame_metadata(data)

load_mask_prompt_schedule

load_mask_prompt_schedule(json_path, prompt_specs)

Load detection JSON and build per-frame label-map prompts.

Source code in cuvis_ai/node/prompts.py
def load_mask_prompt_schedule(
    json_path: str | Path,
    prompt_specs: Sequence[str] | None,
) -> tuple[dict[int, np.ndarray], dict[int, tuple[int, int]], tuple[int, int] | None]:
    """Load detection JSON and build per-frame label-map prompts."""
    annotations_by_frame, frame_hw_by_id, default_hw = load_detection_index(json_path)

    masks_by_frame: dict[int, np.ndarray] = {}
    for order, raw_spec in enumerate(prompt_specs or []):
        spec = parse_spatial_prompt_spec(raw_spec, order=order)
        annotation = _select_annotation_for_prompt(
            annotations_by_frame=annotations_by_frame,
            detection_id=spec.detection_id,
            frame_id=spec.frame_id,
        )
        if "segmentation" not in annotation:
            raise ValueError(
                f"Annotation selected by '{raw_spec}' does not contain a 'segmentation' field."
            )

        frame_hw = _resolve_prompt_frame_hw(
            spec.frame_id,
            frame_hw_by_id,
            default_hw,
            raw_spec=raw_spec,
        )
        binary_mask = _decode_segmentation(
            annotation["segmentation"],
            frame_hw,
            frame_id=spec.frame_id,
        )
        if int(np.count_nonzero(binary_mask)) == 0:
            raise ValueError(f"Annotation selected by '{raw_spec}' has an empty segmentation mask.")
        frame_mask = masks_by_frame.setdefault(
            spec.frame_id,
            np.zeros(frame_hw, dtype=np.int32),
        )
        frame_mask[binary_mask.astype(bool)] = int(spec.object_id)

    return masks_by_frame, frame_hw_by_id, default_hw

load_bbox_prompt_schedule

load_bbox_prompt_schedule(json_path, prompt_specs)

Load detection JSON and build per-frame bbox prompts.

Source code in cuvis_ai/node/prompts.py
def load_bbox_prompt_schedule(
    json_path: str | Path,
    prompt_specs: Sequence[str] | None,
) -> tuple[
    dict[int, list[dict[str, float | int]]],
    dict[int, tuple[int, int]],
    tuple[int, int] | None,
]:
    """Load detection JSON and build per-frame bbox prompts."""
    annotations_by_frame, frame_hw_by_id, default_hw = load_detection_index(json_path)

    prompts_by_frame: dict[int, dict[int, dict[str, float | int]]] = {}
    for order, raw_spec in enumerate(prompt_specs or []):
        spec = parse_spatial_prompt_spec(raw_spec, order=order)
        annotation = _select_annotation_for_prompt(
            annotations_by_frame=annotations_by_frame,
            detection_id=spec.detection_id,
            frame_id=spec.frame_id,
        )
        frame_hw = _resolve_prompt_frame_hw(
            spec.frame_id,
            frame_hw_by_id,
            default_hw,
            raw_spec=raw_spec,
        )
        x_min, y_min, x_max, y_max = _annotation_bbox_xyxy(
            annotation,
            frame_hw,
            raw_spec=raw_spec,
        )
        prompts_by_frame.setdefault(spec.frame_id, {})[spec.object_id] = {
            "element_id": 0,
            "object_id": int(spec.object_id),
            "x_min": float(x_min),
            "y_min": float(y_min),
            "x_max": float(x_max),
            "y_max": float(y_max),
        }

    return (
        {frame_id: list(object_map.values()) for frame_id, object_map in prompts_by_frame.items()},
        frame_hw_by_id,
        default_hw,
    )