Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Preprocessing Nodes

Current preprocessing covers normalization, geometric transforms, ROI cropping, and synthetic occlusion helpers.

Normalization

normalization

Differentiable normalization nodes for BHWC hyperspectral data.

This module provides a collection of normalization nodes designed for hyperspectral imaging pipelines. All normalizers operate on BHWC format ([batch, height, width, channels]) and maintain gradient flow for end-to-end training.

Normalization strategies:

  • MinMaxNormalizer: Scales data to [0, 1] range using min-max statistics
  • ZScoreNormalizer: Standardizes data to zero mean and unit variance
  • SigmoidNormalizer: Applies sigmoid transformation with median centering
  • PerPixelUnitNorm: L2 normalization per pixel across channels
  • IdentityNormalizer: No-op passthrough for testing or baseline comparisons
  • SigmoidTransform: General-purpose sigmoid for logits→probabilities

Why Normalize?

Normalization is critical for stable anomaly detection and deep learning:

  1. Stable covariance estimation: RX detectors require well-conditioned covariance matrices
  2. Gradient stability: Prevents exploding/vanishing gradients during training
  3. Comparable scales: Ensures different spectral ranges contribute equally
  4. Faster convergence: Accelerates gradient-based optimization

BHWC Format Requirement

All normalizers expect BHWC input format. For HWC tensors, add batch dimension:

hwc_tensor = torch.randn(256, 256, 61) # [H, W, C] bhwc_tensor = hwc_tensor.unsqueeze(0) # [1, H, W, C]

IdentityNormalizer

IdentityNormalizer(**kwargs)

Bases: _ScoreNormalizerBase

No-op normalizer; preserves incoming scores.

Source code in cuvis_ai/node/normalization.py
def __init__(self, **kwargs) -> None:
    super().__init__(**kwargs)

MinMaxNormalizer

MinMaxNormalizer(
    eps=1e-06, use_running_stats=True, **kwargs
)

Bases: _ScoreNormalizerBase

Min-max normalization per sample and channel (keeps gradients).

Scales data to [0, 1] range using (x - min) / (max - min) transformation. Can operate in two modes:

  1. Per-sample normalization (use_running_stats=False): min/max computed per batch
  2. Global normalization (use_running_stats=True): uses running statistics from statistical initialization

Parameters:

Name Type Description Default
eps float

Small constant for numerical stability, prevents division by zero (default: 1e-6)

1e-06
use_running_stats bool

If True, use global min/max from statistical_initialization(). If False, compute min/max per batch during forward pass (default: True)

True
**kwargs dict

Additional arguments passed to Node base class

{}

Attributes:

Name Type Description
running_min Tensor

Global minimum value computed during statistical initialization

running_max Tensor

Global maximum value computed during statistical initialization

Examples:

>>> from cuvis_ai.node.normalization import MinMaxNormalizer
>>> from cuvis_ai_core.training import StatisticalTrainer
>>> import torch
>>>
>>> # Mode 1: Global normalization with statistical initialization
>>> normalizer = MinMaxNormalizer(eps=1.0e-6, use_running_stats=True)
>>> stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
>>> stat_trainer.fit()  # Computes global min/max from training data
>>>
>>> # Inference uses global statistics
>>> output = normalizer.forward(data=hyperspectral_cube)
>>> normalized = output["normalized"]  # [B, H, W, C], values in [0, 1]
>>>
>>> # Mode 2: Per-sample normalization (no initialization required)
>>> normalizer_local = MinMaxNormalizer(use_running_stats=False)
>>> output = normalizer_local.forward(data=hyperspectral_cube)
>>> # Each sample normalized independently using its own min/max
See Also

ZScoreNormalizer : Z-score standardization SigmoidNormalizer : Sigmoid-based normalization docs/tutorials/rx-statistical.md : RX pipeline with MinMaxNormalizer

Notes

Global normalization (use_running_stats=True) is recommended for RX detectors to ensure consistent scaling between training and inference. Per-sample normalization can be useful for real-time processing when training data is unavailable.

Source code in cuvis_ai/node/normalization.py
def __init__(self, eps: float = 1e-6, use_running_stats: bool = True, **kwargs) -> None:
    self.eps = float(eps)
    self.use_running_stats = use_running_stats
    super().__init__(eps=eps, use_running_stats=use_running_stats, **kwargs)

    # Running statistics for global normalization
    self.register_buffer("running_min", torch.tensor(float("nan")))
    self.register_buffer("running_max", torch.tensor(float("nan")))

    # Only require initialization when running stats are requested
    self._requires_initial_fit_override = self.use_running_stats
statistical_initialization
statistical_initialization(input_stream)

Compute global min/max from data iterator.

Parameters:

Name Type Description Default
input_stream InputStream

Iterator yielding dicts matching INPUT_SPECS (port-based format) Expected format: {"data": tensor} where tensor is the scores/data

required
Source code in cuvis_ai/node/normalization.py
def statistical_initialization(self, input_stream) -> None:
    """Compute global min/max from data iterator.

    Parameters
    ----------
    input_stream : InputStream
        Iterator yielding dicts matching INPUT_SPECS (port-based format)
        Expected format: {"data": tensor} where tensor is the scores/data
    """
    # Reset previous running statistics before recomputing.
    self.running_min.fill_(float("nan"))
    self.running_max.fill_(float("nan"))
    self._statistically_initialized = False

    all_mins = []
    all_maxs = []

    for batch_data in input_stream:
        # Extract data from port-based dict
        x = batch_data.get("data")
        if x is not None:
            # Flatten spatial dimensions
            flat = x.reshape(x.shape[0], -1)
            batch_min = flat.min()
            batch_max = flat.max()
            all_mins.append(batch_min)
            all_maxs.append(batch_max)

    if not all_mins:
        raise RuntimeError(
            "MinMaxNormalizer.statistical_initialization() did not receive any data."
        )

    self.running_min.copy_(torch.stack(all_mins).min())
    self.running_max.copy_(torch.stack(all_maxs).max())
    self._statistically_initialized = True

SigmoidNormalizer

SigmoidNormalizer(std_floor=1e-06, **kwargs)

Bases: _ScoreNormalizerBase

Median-centered sigmoid squashing per sample and channel.

Applies sigmoid transformation centered at the median with standard deviation scaling:

sigmoid((x - median) / std)

Produces values in [0, 1] range with median mapped to 0.5.

Parameters:

Name Type Description Default
std_floor float

Minimum standard deviation threshold to prevent division by zero (default: 1e-6)

1e-06
**kwargs dict

Additional arguments passed to Node base class

{}

Examples:

>>> from cuvis_ai.node.normalization import SigmoidNormalizer
>>> import torch
>>>
>>> # Create sigmoid normalizer
>>> normalizer = SigmoidNormalizer(std_floor=1.0e-6)
>>>
>>> # Apply to hyperspectral data
>>> data = torch.randn(4, 256, 256, 61)  # [B, H, W, C]
>>> output = normalizer.forward(data=data)
>>> normalized = output["normalized"]  # [4, 256, 256, 61], values in [0, 1]
See Also

MinMaxNormalizer : Min-max scaling to [0, 1] ZScoreNormalizer : Z-score standardization

Notes

Sigmoid normalization is robust to outliers because extreme values are squashed asymptotically to 0 or 1. This makes it suitable for data with heavy-tailed distributions or sporadic anomalies.

Source code in cuvis_ai/node/normalization.py
def __init__(self, std_floor: float = 1e-6, **kwargs) -> None:
    self.std_floor = float(std_floor)
    super().__init__(std_floor=std_floor, **kwargs)

ZScoreNormalizer

ZScoreNormalizer(
    dims=None, eps=1e-06, keepdim=True, **kwargs
)

Bases: _ScoreNormalizerBase

Z-score (standardization) normalization along specified dimensions.

Computes: (x - mean) / (std + eps) along specified dims. Per-sample normalization with no statistical initialization required.

Parameters:

Name Type Description Default
dims list[int]

Dimensions to compute statistics over (default: [1,2] for H,W in BHWC format)

None
eps float

Small constant for numerical stability (default: 1e-6)

1e-06
keepdim bool

Whether to keep reduced dimensions (default: True)

True

Examples:

>>> # Normalize over spatial dimensions (H, W)
>>> zscore = ZScoreNormalizer(dims=[1, 2])
>>>
>>> # Normalize over all spatial and channel dimensions
>>> zscore_all = ZScoreNormalizer(dims=[1, 2, 3])
Source code in cuvis_ai/node/normalization.py
def __init__(
    self, dims: list[int] | None = None, eps: float = 1e-6, keepdim: bool = True, **kwargs
) -> None:
    self.dims = dims if dims is not None else [1, 2]
    self.eps = float(eps)
    self.keepdim = keepdim
    super().__init__(dims=self.dims, eps=eps, keepdim=keepdim, **kwargs)

SigmoidTransform

SigmoidTransform(**kwargs)

Bases: Node

Applies sigmoid transformation to convert logits to probabilities [0,1].

General-purpose sigmoid node for converting raw scores/logits to probability space. Useful for visualization or downstream nodes that expect bounded [0,1] values.

Examples:

>>> sigmoid = SigmoidTransform()
>>> # Route logits to both loss (raw) and visualization (sigmoid)
>>> graph.connect(
...     (rx.scores, loss_node.predictions),  # Raw logits to loss
...     (rx.scores, sigmoid.data),           # Logits to sigmoid
...     (sigmoid.transformed, viz.scores),   # Probabilities to viz
... )
Source code in cuvis_ai/node/normalization.py
def __init__(self, **kwargs) -> None:
    super().__init__(**kwargs)
forward
forward(data, **_)

Apply sigmoid transformation.

Parameters:

Name Type Description Default
data Tensor

Input tensor

required

Returns:

Type Description
dict[str, Tensor]

Dictionary with "transformed" key containing sigmoid output

Source code in cuvis_ai/node/normalization.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Apply sigmoid transformation.

    Parameters
    ----------
    data : Tensor
        Input tensor

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "transformed" key containing sigmoid output
    """
    return {"transformed": torch.sigmoid(data)}

PerPixelUnitNorm

PerPixelUnitNorm(eps=1e-08, **kwargs)

Bases: _ScoreNormalizerBase

Per-pixel mean-centering and L2 normalization across channels.

Source code in cuvis_ai/node/normalization.py
def __init__(self, eps: float = 1e-8, **kwargs) -> None:
    self.eps = float(eps)
    super().__init__(eps=self.eps, **kwargs)
forward
forward(data, **_)

Normalize BHWC tensors per pixel.

Source code in cuvis_ai/node/normalization.py
def forward(self, data: Tensor, **_: Any) -> dict[str, Tensor]:
    """Normalize BHWC tensors per pixel."""
    normalized = self._normalize(data)
    return {"normalized": normalized}

Preprocessors

preprocessors

Preprocessing Nodes.

This module provides nodes for preprocessing hyperspectral data, including wavelength-based band selection and filtering. These nodes help reduce dimensionality and focus analysis on specific spectral regions of interest.

See Also

cuvis_ai.node.channel_selector : Advanced channel selection methods cuvis_ai.node.normalization : Normalization and standardization nodes

BandpassByWavelength

BandpassByWavelength(
    min_wavelength_nm, max_wavelength_nm=None, **kwargs
)

Bases: Node

Select channels by wavelength interval from BHWC tensors.

This node filters hyperspectral data by keeping only channels within a specified wavelength range. Wavelengths must be provided via the input port.

Parameters:

Name Type Description Default
min_wavelength_nm float

Minimum wavelength (inclusive) to keep, in nanometers

required
max_wavelength_nm float | None

Maximum wavelength (inclusive) to keep. If None, selects all wavelengths

= min_wavelength_nm. Default: None

None

Examples:

>>> # Create bandpass node
>>> bandpass = BandpassByWavelength(
...     min_wavelength_nm=500.0,
...     max_wavelength_nm=700.0,
... )
>>> # Filter cube in BHWC format with wavelengths from input port
>>> wavelengths_tensor = torch.from_numpy(wavelengths).float()
>>> filtered = bandpass.forward(data=cube_bhwc, wavelengths=wavelengths_tensor)["filtered"]
>>>
>>> # For single HWC images, add a batch dimension first:
>>> # filtered = bandpass.forward(data=cube_hwc.unsqueeze(0), wavelengths=wavelengths_tensor)["filtered"]
>>>
>>> # Use with wavelengths from upstream node
>>> pipeline.connect(
...     (data_node.outputs.cube, bandpass.data),
...     (data_node.outputs.wavelengths, bandpass.wavelengths),
... )
Source code in cuvis_ai/node/preprocessors.py
def __init__(
    self,
    min_wavelength_nm: float,
    max_wavelength_nm: float | None = None,
    **kwargs,
) -> None:
    self.min_wavelength_nm = float(min_wavelength_nm)
    self.max_wavelength_nm = float(max_wavelength_nm) if max_wavelength_nm is not None else None

    super().__init__(
        min_wavelength_nm=self.min_wavelength_nm,
        max_wavelength_nm=self.max_wavelength_nm,
        **kwargs,
    )
forward
forward(data, wavelengths, **kwargs)

Filter cube by wavelength range.

Parameters:

Name Type Description Default
data Tensor

Input hyperspectral cube [B, H, W, C].

required
wavelengths Tensor

Wavelengths tensor [C] in nanometers.

required
**kwargs Any

Additional keyword arguments (unused).

{}

Returns:

Type Description
dict[str, Tensor]

Dictionary with "filtered" key containing filtered cube [B, H, W, C_filtered]

Raises:

Type Description
ValueError

If no channels are selected by the provided wavelength range

Source code in cuvis_ai/node/preprocessors.py
def forward(self, data: Tensor, wavelengths: Tensor, **kwargs: Any) -> dict[str, Tensor]:
    """Filter cube by wavelength range.

    Parameters
    ----------
    data : Tensor
        Input hyperspectral cube [B, H, W, C].
    wavelengths : Tensor
        Wavelengths tensor [C] in nanometers.
    **kwargs : Any
        Additional keyword arguments (unused).

    Returns
    -------
    dict[str, Tensor]
        Dictionary with "filtered" key containing filtered cube [B, H, W, C_filtered]

    Raises
    ------
    ValueError
        If no channels are selected by the provided wavelength range
    """

    # Create mask for wavelength range
    if self.max_wavelength_nm is None:
        keep_mask = wavelengths >= self.min_wavelength_nm
    else:
        keep_mask = (wavelengths >= self.min_wavelength_nm) & (
            wavelengths <= self.max_wavelength_nm
        )

    if keep_mask.sum().item() == 0:
        raise ValueError("No channels selected by the provided wavelength range")

    # Filter cube
    filtered = data[..., keep_mask]

    return {"filtered": filtered}

SpatialRotateNode

SpatialRotateNode(rotation=None, **kwargs)

Bases: Node

Rotate spatial dimensions of cubes, masks, and RGB images.

Applies a fixed rotation (90, -90, or 180 degrees) to the H and W dimensions of all provided inputs. Wavelengths pass through unchanged.

Place immediately after a data node so all downstream consumers see correctly oriented data.

Parameters:

Name Type Description Default
rotation int | None

Rotation in degrees. Supported: 90, -90, 180 (and aliases 270, -270, -180). None or 0 means passthrough.

None
Source code in cuvis_ai/node/preprocessors.py
def __init__(self, rotation: int | None = None, **kwargs: Any) -> None:
    if rotation not in self._VALID_ROTATIONS:
        raise ValueError(
            f"rotation must be one of {sorted(r for r in self._VALID_ROTATIONS if r is not None)}"
            f" or None, got {rotation}"
        )
    self.rotation = self._normalize(rotation)
    super().__init__(rotation=rotation, **kwargs)
forward
forward(cube, mask=None, rgb_image=None, **_)

Apply the configured rotation to the cube, mask, and rgb_image tensors.

Source code in cuvis_ai/node/preprocessors.py
@torch.no_grad()
def forward(
    self,
    cube: Tensor,
    mask: Tensor | None = None,
    rgb_image: Tensor | None = None,
    **_: Any,
) -> dict[str, Tensor]:
    """Apply the configured rotation to the cube, mask, and rgb_image tensors."""
    k = {None: 0, 90: 1, -90: -1, 180: 2}[self.rotation]

    result: dict[str, Tensor] = {}
    result["cube"] = torch.rot90(cube, k=k, dims=(1, 2)).contiguous() if k else cube
    if mask is not None:
        result["mask"] = torch.rot90(mask, k=k, dims=(1, 2)).contiguous() if k else mask
    if rgb_image is not None:
        result["rgb_image"] = (
            torch.rot90(rgb_image, k=k, dims=(1, 2)).contiguous() if k else rgb_image
        )
    return result

BBoxRoiCropNode

BBoxRoiCropNode(
    output_size=(256, 128), aligned=True, **kwargs
)

Bases: Node

Differentiable bbox cropping via torchvision roi_align.

Accepts BHWC images and xyxy bboxes, outputs NCHW crops resized to a fixed output_size. Padding rows (all coords <= 0) are filtered out, so the output N equals the number of valid detections.

Parameters:

Name Type Description Default
output_size tuple[int, int]

Target crop size (H, W) for roi_align.

(256, 128)
aligned bool

Use sub-pixel aligned roi_align (recommended).

True
Source code in cuvis_ai/node/preprocessors.py
def __init__(
    self,
    output_size: tuple[int, int] = (256, 128),
    aligned: bool = True,
    **kwargs: Any,
) -> None:
    self.output_size = tuple(output_size)
    self.aligned = bool(aligned)
    super().__init__(output_size=list(output_size), aligned=aligned, **kwargs)
forward
forward(images, bboxes, **_)

Crop and resize bounding-box regions from images.

Parameters:

Name Type Description Default
images Tensor

[B, H, W, C] float32, values in [0, 1].

required
bboxes Tensor

[B, N_padded, 4] float32 xyxy pixel coordinates.

required

Returns:

Type Description
dict

{"crops": Tensor [N, C, crop_h, crop_w]}

Source code in cuvis_ai/node/preprocessors.py
def forward(self, images: Tensor, bboxes: Tensor, **_: Any) -> dict[str, Tensor]:
    """Crop and resize bounding-box regions from images.

    Parameters
    ----------
    images : Tensor
        ``[B, H, W, C]`` float32, values in [0, 1].
    bboxes : Tensor
        ``[B, N_padded, 4]`` float32 xyxy pixel coordinates.

    Returns
    -------
    dict
        ``{"crops": Tensor [N, C, crop_h, crop_w]}``
    """
    from torchvision.ops import roi_align

    B, _H, _W, C = images.shape
    crop_h, crop_w = self.output_size

    # BHWC → BCHW
    images_bchw = images.permute(0, 3, 1, 2).contiguous()

    # Build batch indices and flatten bboxes
    N_padded = bboxes.shape[1]
    batch_idx = (
        torch.arange(B, device=bboxes.device).unsqueeze(1).expand(B, N_padded).reshape(-1)
    )
    flat_bboxes = bboxes.reshape(-1, 4)  # [B*N_padded, 4]

    # Filter padding rows (all coords <= 0)
    valid_mask = (flat_bboxes > 0).any(dim=1)
    valid_bboxes = flat_bboxes[valid_mask]
    valid_batch_idx = batch_idx[valid_mask]

    N = valid_bboxes.shape[0]
    if N == 0:
        return {
            "crops": torch.empty(0, C, crop_h, crop_w, device=images.device, dtype=images.dtype)
        }

    # Build [N, 5] roi tensor: [batch_index, x1, y1, x2, y2]
    rois = torch.cat([valid_batch_idx.unsqueeze(1).to(valid_bboxes.dtype), valid_bboxes], dim=1)

    crops = roi_align(
        images_bchw,
        rois,
        output_size=self.output_size,
        spatial_scale=1.0,
        aligned=self.aligned,
    )

    return {"crops": crops}

ChannelNormalizeNode

ChannelNormalizeNode(
    mean=IMAGENET_MEAN, std=IMAGENET_STD, **kwargs
)

Bases: Node

Per-channel mean/std normalization for NCHW tensors.

Defaults to ImageNet statistics but accepts any per-channel values.

Parameters:

Name Type Description Default
mean tuple[float, ...]

Per-channel mean.

IMAGENET_MEAN
std tuple[float, ...]

Per-channel std.

IMAGENET_STD
Source code in cuvis_ai/node/preprocessors.py
def __init__(
    self,
    mean: tuple[float, ...] = IMAGENET_MEAN,
    std: tuple[float, ...] = IMAGENET_STD,
    **kwargs: Any,
) -> None:
    self._mean_vals = tuple(float(v) for v in mean)
    self._std_vals = tuple(float(v) for v in std)

    super().__init__(mean=list(self._mean_vals), std=list(self._std_vals), **kwargs)

    # Register as buffers so they auto-move with .to(device)
    self.register_buffer(
        "_mean_buf",
        torch.tensor(self._mean_vals, dtype=torch.float32).view(1, -1, 1, 1),
    )
    self.register_buffer(
        "_std_buf",
        torch.tensor(self._std_vals, dtype=torch.float32).view(1, -1, 1, 1),
    )
forward
forward(images, **_)

Normalize images per channel.

Parameters:

Name Type Description Default
images Tensor

[N, C, H, W] float32.

required

Returns:

Type Description
dict

{"normalized": Tensor [N, C, H, W]}

Source code in cuvis_ai/node/preprocessors.py
def forward(self, images: Tensor, **_: Any) -> dict[str, Tensor]:
    """Normalize images per channel.

    Parameters
    ----------
    images : Tensor
        ``[N, C, H, W]`` float32.

    Returns
    -------
    dict
        ``{"normalized": Tensor [N, C, H, W]}``
    """
    normalized = (images - self._mean_buf) / self._std_buf
    return {"normalized": normalized}

Occlusion Nodes

occlusion

Synthetic occlusion nodes for tracking evaluation (pure PyTorch).

OcclusionNodeBase

OcclusionNodeBase(
    tracking_json_path,
    track_ids,
    occlusion_start_frame,
    occlusion_end_frame,
    **kwargs,
)

Bases: Node, ABC

Base class for synthetic occlusion from tracking masks.

Source code in cuvis_ai/node/occlusion.py
def __init__(
    self,
    tracking_json_path: str,
    track_ids: list[int],
    occlusion_start_frame: int,
    occlusion_end_frame: int,
    **kwargs,
) -> None:
    path = Path(tracking_json_path)
    if not path.is_file():
        raise FileNotFoundError(f"Tracking JSON not found: {tracking_json_path}")

    data = json.loads(path.read_text(encoding="utf-8"))
    track_id_set = set(track_ids)

    self._masks_by_frame: dict[int, list[dict]] = {}
    for ann in data.get("annotations", []):
        tid = ann.get("track_id")
        if tid not in track_id_set:
            continue
        fid = int(ann["image_id"])
        if fid < occlusion_start_frame or fid > occlusion_end_frame:
            continue
        seg = ann.get("segmentation")
        if seg is None or not isinstance(seg, dict):
            continue
        entry = {
            "track_id": int(tid),
            "bbox": ann["bbox"],
            "segmentation": seg,
        }
        self._masks_by_frame.setdefault(fid, []).append(entry)

    self.occlusion_start_frame = int(occlusion_start_frame)
    self.occlusion_end_frame = int(occlusion_end_frame)

    n_frames = len(self._masks_by_frame)
    n_annots = sum(len(v) for v in self._masks_by_frame.values())
    logger.info(
        "OcclusionNode: loaded {} annotations across {} frames for tracks {} (range [{}, {}])",
        n_annots,
        n_frames,
        track_ids,
        occlusion_start_frame,
        occlusion_end_frame,
    )

    super().__init__(
        tracking_json_path=tracking_json_path,
        track_ids=track_ids,
        occlusion_start_frame=occlusion_start_frame,
        occlusion_end_frame=occlusion_end_frame,
        **kwargs,
    )
forward
forward(rgb_image, frame_id, **_)

Conditionally occlude an RGB batch using tracking-derived masks.

Source code in cuvis_ai/node/occlusion.py
@torch.no_grad()
def forward(
    self,
    rgb_image: torch.Tensor,
    frame_id: torch.Tensor,
    **_,
) -> dict[str, torch.Tensor]:
    """Conditionally occlude an RGB batch using tracking-derived masks."""
    return self._forward_tensor(data=rgb_image, output_key="rgb_image", frame_id=frame_id)

PoissonOcclusionNode

PoissonOcclusionNode(
    tracking_json_path,
    track_ids,
    occlusion_start_frame,
    occlusion_end_frame,
    fill_color="poisson",
    *,
    input_key=None,
    max_iter=1000,
    tol=1e-06,
    occlusion_shape="bbox",
    bbox_mode="static",
    static_bbox_scale=1.2,
    static_bbox_padding_px=0,
    static_full_width_x=False,
    **kwargs,
)

Bases: OcclusionNodeBase

Pure-PyTorch occlusion node for either RGB frames or hyperspectral cubes.

Source code in cuvis_ai/node/occlusion.py
def __init__(
    self,
    tracking_json_path: str,
    track_ids: list[int],
    occlusion_start_frame: int,
    occlusion_end_frame: int,
    fill_color: tuple[float, float, float] | str = "poisson",
    *,
    input_key: str | None = None,
    max_iter: int = 1000,
    tol: float = 1e-6,
    occlusion_shape: str = "bbox",
    bbox_mode: str = "static",
    static_bbox_scale: float = 1.2,
    static_bbox_padding_px: int = 0,
    static_full_width_x: bool = False,
    **kwargs,
) -> None:
    if occlusion_shape not in self._VALID_SHAPES:
        raise ValueError(
            f"occlusion_shape must be one of {self._VALID_SHAPES}, got '{occlusion_shape}'"
        )
    if bbox_mode not in self._VALID_BBOX_MODES:
        raise ValueError(
            f"bbox_mode must be one of {self._VALID_BBOX_MODES}, got '{bbox_mode}'"
        )
    if static_bbox_scale <= 0:
        raise ValueError("static_bbox_scale must be > 0")
    if static_bbox_padding_px < 0:
        raise ValueError("static_bbox_padding_px must be >= 0")
    if int(max_iter) <= 0:
        raise ValueError("max_iter must be > 0")
    if float(tol) <= 0:
        raise ValueError("tol must be > 0")
    if input_key is not None and input_key not in {"rgb_image", "cube"}:
        raise ValueError("input_key must be 'rgb_image', 'cube', or None")

    self._use_poisson_fill = False
    if isinstance(fill_color, str):
        if fill_color != "poisson":
            raise ValueError("fill_color string must be exactly 'poisson'")
        self.fill_color: tuple[float, float, float] | str = fill_color
        self._use_poisson_fill = True
    else:
        parsed_fill = tuple(float(c) for c in fill_color)
        if len(parsed_fill) != 3:
            raise ValueError("fill_color tuple must have exactly 3 values")
        if any(c < 0.0 or c > 1.0 for c in parsed_fill):
            raise ValueError("fill_color tuple values must be in [0, 1]")
        self.fill_color = parsed_fill

    self.max_iter = int(max_iter)
    self.tol = float(tol)
    self.input_key = input_key
    self.occlusion_shape = occlusion_shape
    self.bbox_mode = bbox_mode
    self.static_bbox_scale = float(static_bbox_scale)
    self.static_bbox_padding_px = int(static_bbox_padding_px)
    self.static_full_width_x = bool(static_full_width_x)

    super().__init__(
        tracking_json_path=tracking_json_path,
        track_ids=track_ids,
        occlusion_start_frame=occlusion_start_frame,
        occlusion_end_frame=occlusion_end_frame,
        fill_color=self.fill_color,
        input_key=self.input_key,
        max_iter=self.max_iter,
        tol=self.tol,
        occlusion_shape=occlusion_shape,
        bbox_mode=bbox_mode,
        static_bbox_scale=static_bbox_scale,
        static_bbox_padding_px=static_bbox_padding_px,
        static_full_width_x=static_full_width_x,
        **kwargs,
    )

    self._static_bboxes_by_track: dict[int, list[float]] = {}
    if self.occlusion_shape == "bbox" and self.bbox_mode == "static":
        self._static_bboxes_by_track = self._build_static_bboxes_by_track()
        logger.info(
            "PoissonOcclusionNode static bboxes: {} tracks (scale={}, padding_px={})",
            len(self._static_bboxes_by_track),
            self.static_bbox_scale,
            self.static_bbox_padding_px,
        )
forward
forward(frame_id, rgb_image=None, cube=None, **_)

Occlude either the provided RGB batch or cube batch for the current frame.

Source code in cuvis_ai/node/occlusion.py
@torch.no_grad()
def forward(
    self,
    frame_id: torch.Tensor,
    rgb_image: torch.Tensor | None = None,
    cube: torch.Tensor | None = None,
    **_,
) -> dict[str, torch.Tensor]:
    """Occlude either the provided RGB batch or cube batch for the current frame."""
    if self.input_key == "rgb_image":
        if rgb_image is None:
            raise ValueError(
                "PoissonOcclusionNode configured for rgb_image but none was provided"
            )
        return self._forward_tensor(data=rgb_image, output_key="rgb_image", frame_id=frame_id)

    if self.input_key == "cube":
        if cube is None:
            raise ValueError("PoissonOcclusionNode configured for cube but none was provided")
        return self._forward_tensor(data=cube, output_key="cube", frame_id=frame_id)

    if (rgb_image is None) and (cube is None):
        raise ValueError("PoissonOcclusionNode requires exactly one input: rgb_image or cube")
    if (rgb_image is not None) and (cube is not None):
        raise ValueError("PoissonOcclusionNode accepts either rgb_image or cube, not both")

    if rgb_image is not None:
        return self._forward_tensor(data=rgb_image, output_key="rgb_image", frame_id=frame_id)

    assert cube is not None
    return self._forward_tensor(data=cube, output_key="cube", frame_id=frame_id)

SolidOcclusionNode

SolidOcclusionNode(
    tracking_json_path,
    track_ids,
    occlusion_start_frame,
    occlusion_end_frame,
    fill_color="poisson",
    *,
    input_key=None,
    max_iter=1000,
    tol=1e-06,
    occlusion_shape="bbox",
    bbox_mode="static",
    static_bbox_scale=1.2,
    static_bbox_padding_px=0,
    static_full_width_x=False,
    **kwargs,
)

Bases: PoissonOcclusionNode

Deprecated alias of PoissonOcclusionNode.

Source code in cuvis_ai/node/occlusion.py
def __init__(
    self,
    tracking_json_path: str,
    track_ids: list[int],
    occlusion_start_frame: int,
    occlusion_end_frame: int,
    fill_color: tuple[float, float, float] | str = "poisson",
    *,
    input_key: str | None = None,
    max_iter: int = 1000,
    tol: float = 1e-6,
    occlusion_shape: str = "bbox",
    bbox_mode: str = "static",
    static_bbox_scale: float = 1.2,
    static_bbox_padding_px: int = 0,
    static_full_width_x: bool = False,
    **kwargs,
) -> None:
    if occlusion_shape not in self._VALID_SHAPES:
        raise ValueError(
            f"occlusion_shape must be one of {self._VALID_SHAPES}, got '{occlusion_shape}'"
        )
    if bbox_mode not in self._VALID_BBOX_MODES:
        raise ValueError(
            f"bbox_mode must be one of {self._VALID_BBOX_MODES}, got '{bbox_mode}'"
        )
    if static_bbox_scale <= 0:
        raise ValueError("static_bbox_scale must be > 0")
    if static_bbox_padding_px < 0:
        raise ValueError("static_bbox_padding_px must be >= 0")
    if int(max_iter) <= 0:
        raise ValueError("max_iter must be > 0")
    if float(tol) <= 0:
        raise ValueError("tol must be > 0")
    if input_key is not None and input_key not in {"rgb_image", "cube"}:
        raise ValueError("input_key must be 'rgb_image', 'cube', or None")

    self._use_poisson_fill = False
    if isinstance(fill_color, str):
        if fill_color != "poisson":
            raise ValueError("fill_color string must be exactly 'poisson'")
        self.fill_color: tuple[float, float, float] | str = fill_color
        self._use_poisson_fill = True
    else:
        parsed_fill = tuple(float(c) for c in fill_color)
        if len(parsed_fill) != 3:
            raise ValueError("fill_color tuple must have exactly 3 values")
        if any(c < 0.0 or c > 1.0 for c in parsed_fill):
            raise ValueError("fill_color tuple values must be in [0, 1]")
        self.fill_color = parsed_fill

    self.max_iter = int(max_iter)
    self.tol = float(tol)
    self.input_key = input_key
    self.occlusion_shape = occlusion_shape
    self.bbox_mode = bbox_mode
    self.static_bbox_scale = float(static_bbox_scale)
    self.static_bbox_padding_px = int(static_bbox_padding_px)
    self.static_full_width_x = bool(static_full_width_x)

    super().__init__(
        tracking_json_path=tracking_json_path,
        track_ids=track_ids,
        occlusion_start_frame=occlusion_start_frame,
        occlusion_end_frame=occlusion_end_frame,
        fill_color=self.fill_color,
        input_key=self.input_key,
        max_iter=self.max_iter,
        tol=self.tol,
        occlusion_shape=occlusion_shape,
        bbox_mode=bbox_mode,
        static_bbox_scale=static_bbox_scale,
        static_bbox_padding_px=static_bbox_padding_px,
        static_full_width_x=static_full_width_x,
        **kwargs,
    )

    self._static_bboxes_by_track: dict[int, list[float]] = {}
    if self.occlusion_shape == "bbox" and self.bbox_mode == "static":
        self._static_bboxes_by_track = self._build_static_bboxes_by_track()
        logger.info(
            "PoissonOcclusionNode static bboxes: {} tracks (scale={}, padding_px={})",
            len(self._static_bboxes_by_track),
            self.static_bbox_scale,
            self.static_bbox_padding_px,
        )

PoissonCubeOcclusionNode

PoissonCubeOcclusionNode(
    tracking_json_path,
    track_ids,
    occlusion_start_frame,
    occlusion_end_frame,
    fill_color="poisson",
    *,
    input_key=None,
    max_iter=1000,
    tol=1e-06,
    occlusion_shape="bbox",
    bbox_mode="static",
    static_bbox_scale=1.2,
    static_bbox_padding_px=0,
    static_full_width_x=False,
    **kwargs,
)

Bases: PoissonOcclusionNode

Deprecated alias of PoissonOcclusionNode with cube-only ports.

Source code in cuvis_ai/node/occlusion.py
def __init__(
    self,
    tracking_json_path: str,
    track_ids: list[int],
    occlusion_start_frame: int,
    occlusion_end_frame: int,
    fill_color: tuple[float, float, float] | str = "poisson",
    *,
    input_key: str | None = None,
    max_iter: int = 1000,
    tol: float = 1e-6,
    occlusion_shape: str = "bbox",
    bbox_mode: str = "static",
    static_bbox_scale: float = 1.2,
    static_bbox_padding_px: int = 0,
    static_full_width_x: bool = False,
    **kwargs,
) -> None:
    if occlusion_shape not in self._VALID_SHAPES:
        raise ValueError(
            f"occlusion_shape must be one of {self._VALID_SHAPES}, got '{occlusion_shape}'"
        )
    if bbox_mode not in self._VALID_BBOX_MODES:
        raise ValueError(
            f"bbox_mode must be one of {self._VALID_BBOX_MODES}, got '{bbox_mode}'"
        )
    if static_bbox_scale <= 0:
        raise ValueError("static_bbox_scale must be > 0")
    if static_bbox_padding_px < 0:
        raise ValueError("static_bbox_padding_px must be >= 0")
    if int(max_iter) <= 0:
        raise ValueError("max_iter must be > 0")
    if float(tol) <= 0:
        raise ValueError("tol must be > 0")
    if input_key is not None and input_key not in {"rgb_image", "cube"}:
        raise ValueError("input_key must be 'rgb_image', 'cube', or None")

    self._use_poisson_fill = False
    if isinstance(fill_color, str):
        if fill_color != "poisson":
            raise ValueError("fill_color string must be exactly 'poisson'")
        self.fill_color: tuple[float, float, float] | str = fill_color
        self._use_poisson_fill = True
    else:
        parsed_fill = tuple(float(c) for c in fill_color)
        if len(parsed_fill) != 3:
            raise ValueError("fill_color tuple must have exactly 3 values")
        if any(c < 0.0 or c > 1.0 for c in parsed_fill):
            raise ValueError("fill_color tuple values must be in [0, 1]")
        self.fill_color = parsed_fill

    self.max_iter = int(max_iter)
    self.tol = float(tol)
    self.input_key = input_key
    self.occlusion_shape = occlusion_shape
    self.bbox_mode = bbox_mode
    self.static_bbox_scale = float(static_bbox_scale)
    self.static_bbox_padding_px = int(static_bbox_padding_px)
    self.static_full_width_x = bool(static_full_width_x)

    super().__init__(
        tracking_json_path=tracking_json_path,
        track_ids=track_ids,
        occlusion_start_frame=occlusion_start_frame,
        occlusion_end_frame=occlusion_end_frame,
        fill_color=self.fill_color,
        input_key=self.input_key,
        max_iter=self.max_iter,
        tol=self.tol,
        occlusion_shape=occlusion_shape,
        bbox_mode=bbox_mode,
        static_bbox_scale=static_bbox_scale,
        static_bbox_padding_px=static_bbox_padding_px,
        static_full_width_x=static_full_width_x,
        **kwargs,
    )

    self._static_bboxes_by_track: dict[int, list[float]] = {}
    if self.occlusion_shape == "bbox" and self.bbox_mode == "static":
        self._static_bboxes_by_track = self._build_static_bboxes_by_track()
        logger.info(
            "PoissonOcclusionNode static bboxes: {} tracks (scale={}, padding_px={})",
            len(self._static_bboxes_by_track),
            self.static_bbox_scale,
            self.static_bbox_padding_px,
        )
forward
forward(cube, frame_id, **_)

Apply cube-only occlusion using the parent implementation.

Source code in cuvis_ai/node/occlusion.py
@torch.no_grad()
def forward(
    self,
    cube: torch.Tensor,
    frame_id: torch.Tensor,
    **_,
) -> dict[str, torch.Tensor]:
    """Apply cube-only occlusion using the parent implementation."""
    return super().forward(frame_id=frame_id, cube=cube)