Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Data API

Current data entry, reader, and frame-stream APIs.

Data Nodes

data

Data preparation nodes for CU3S hyperspectral pipelines.

CU3SDataNode

Bases: Node

General-purpose data node for CU3S hyperspectral sequences.

This node normalizes common CU3S batch inputs for pipelines:

  • converts cube from uint16 to float32
  • passes optional mask through unchanged
  • extracts 1D wavelengths from batched input
forward
forward(
    cube, mask=None, wavelengths=None, mesu_index=None, **_
)

Normalize CU3S batch data for pipeline consumption.

Source code in cuvis_ai/node/data.py
def forward(
    self,
    cube: torch.Tensor,
    mask: torch.Tensor | None = None,
    wavelengths: torch.Tensor | None = None,
    mesu_index: torch.Tensor | None = None,
    **_: Any,
) -> dict[str, torch.Tensor | np.ndarray]:
    """Normalize CU3S batch data for pipeline consumption."""
    result: dict[str, torch.Tensor | np.ndarray] = {"cube": cube.to(torch.float32)}

    # Keep the same behavior as existing data nodes: use first batch entry.
    if wavelengths is not None:
        result["wavelengths"] = wavelengths[0].cpu().numpy()

    if mask is not None:
        result["mask"] = mask

    if mesu_index is not None:
        result["mesu_index"] = mesu_index

    return result

LentilsAnomalyDataNode

LentilsAnomalyDataNode(
    normal_class_ids, anomaly_class_ids=None, **kwargs
)

Bases: CU3SDataNode

Lentils-specific CU3S data node with binary anomaly label mapping.

Inherits shared CU3S normalization (cube + wavelengths) and additionally maps multi-class masks to binary anomaly masks.

Source code in cuvis_ai/node/data.py
def __init__(
    self, normal_class_ids: list[int], anomaly_class_ids: list[int] | None = None, **kwargs
) -> None:
    # Keep node params on the base Node for config/serialization compatibility.
    super().__init__(
        normal_class_ids=normal_class_ids, anomaly_class_ids=anomaly_class_ids, **kwargs
    )
    self._binary_mapper = BinaryAnomalyLabelMapper(
        normal_class_ids=normal_class_ids,
        anomaly_class_ids=anomaly_class_ids,
    )
forward
forward(cube, mask=None, wavelengths=None, **_)

Apply CU3S normalization and optional Lentils binary mask mapping.

Source code in cuvis_ai/node/data.py
def forward(
    self,
    cube: torch.Tensor,
    mask: torch.Tensor | None = None,
    wavelengths: torch.Tensor | None = None,
    **_: Any,
) -> dict[str, torch.Tensor | np.ndarray]:
    """Apply CU3S normalization and optional Lentils binary mask mapping."""
    result = super().forward(cube=cube, mask=None, wavelengths=wavelengths, **_)

    if mask is not None:
        # Mapper expects channel-last mask: BHW -> BHWC.
        mask_4d = mask.unsqueeze(-1)
        mapped = self._binary_mapper.forward(cube=cube, mask=mask_4d, **_)
        result["mask"] = mapped["mask"]

    return result

Tracking JSON Readers

DetectionJsonReader

DetectionJsonReader(json_path, **kwargs)

Bases: Node

Read COCO detection JSON and emit tensors per frame.

Outputs per call:

  • frame_id: int64 [1]
  • bboxes: float32 [1, N, 4] (xyxy)
  • category_ids: int64 [1, N]
  • confidences: float32 [1, N]
  • orig_hw: int64 [1, 2]
Source code in cuvis_ai/node/json_file.py
def __init__(self, json_path: str, **kwargs: Any) -> None:
    self.json_path = Path(json_path)
    if not self.json_path.exists():
        raise FileNotFoundError(f"JSON not found: {self.json_path}")

    with self.json_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    self._images = {int(img["id"]): img for img in data.get("images", [])}
    self._annotations_by_img: dict[int, list[dict[str, Any]]] = {}
    for ann in data.get("annotations", []):
        self._annotations_by_img.setdefault(int(ann["image_id"]), []).append(ann)

    self._frame_ids = sorted(self._images.keys())
    self._cursor = 0

    super().__init__(json_path=str(self.json_path), **kwargs)

reset

reset()

Rewind to the first frame.

Source code in cuvis_ai/node/json_file.py
def reset(self) -> None:  # noqa: D401
    """Rewind to the first frame."""
    self._cursor = 0

forward

forward(context=None, **_)

Emit detections for the next frame in the detection JSON stream.

Source code in cuvis_ai/node/json_file.py
def forward(self, context: Context | None = None, **_: Any) -> dict[str, Any]:  # noqa: ARG002
    """Emit detections for the next frame in the detection JSON stream."""
    if self._cursor >= len(self._frame_ids):
        raise StopIteration("No more frames in detection JSON")

    frame_id = self._frame_ids[self._cursor]
    self._cursor += 1

    img = self._images[frame_id]
    anns = self._annotations_by_img.get(frame_id, [])

    bboxes = []
    cats = []
    scores = []
    for ann in anns:
        x, y, w, h = ann["bbox"]
        bboxes.append([x, y, x + w, y + h])
        category_id = ann.get("category_id", 0)
        score = ann.get("score", 0.0)
        cats.append(int(category_id) if category_id is not None else 0)
        scores.append(float(score) if score is not None else 0.0)

    bboxes_t = (
        torch.tensor([bboxes], dtype=torch.float32)
        if bboxes
        else torch.empty((1, 0, 4), dtype=torch.float32)
    )
    cats_t = (
        torch.tensor([cats], dtype=torch.int64)
        if cats
        else torch.empty((1, 0), dtype=torch.int64)
    )
    scores_t = (
        torch.tensor([scores], dtype=torch.float32)
        if scores
        else torch.empty((1, 0), dtype=torch.float32)
    )

    h = int(img.get("height", 0))
    w = int(img.get("width", 0))
    orig_hw = torch.tensor([[h, w]], dtype=torch.int64)

    return {
        "frame_id": torch.tensor([frame_id], dtype=torch.int64),
        "bboxes": bboxes_t,
        "category_ids": cats_t,
        "confidences": scores_t,
        "orig_hw": orig_hw,
    }

TrackingResultsReader

TrackingResultsReader(
    json_path, required_format=None, **kwargs
)

Bases: Node

Read tracking results JSON (bbox or mask format) and emit per-frame tensors.

Supports two JSON formats:

  1. COCO bbox trackingimages + annotations with bbox and track_id fields. Emits bboxes, category_ids, confidences, track_ids.

  2. Video COCOvideos + annotations with segmentations list of RLE dicts. Emits mask label map and object_ids.

Optional outputs are None when the format doesn't provide them.

Frame synchronization: When the optional frame_id input is connected (e.g. from CU3SDataNode.mesu_index), the reader looks up detections for that specific frame instead of cursor-advancing. This guarantees that the emitted bboxes/masks correspond to the same frame as the cube data. When frame_id is not connected, the reader uses the internal cursor (legacy behavior).

Source code in cuvis_ai/node/json_file.py
def __init__(
    self,
    json_path: str,
    required_format: str | None = None,
    **kwargs: Any,
) -> None:
    self.json_path = Path(json_path)
    if not self.json_path.exists():
        raise FileNotFoundError(f"JSON not found: {self.json_path}")
    if required_format is not None and required_format not in {"coco_bbox", "video_coco"}:
        raise ValueError(
            "required_format must be one of {'coco_bbox', 'video_coco'} when provided."
        )

    with self.json_path.open("r", encoding="utf-8") as f:
        data = json.load(f)

    # Detect format and build per-frame lookup
    if "videos" in data and "annotations" in data:
        self._format = "video_coco"
        self._init_video_coco(data)
    elif "images" in data and "annotations" in data:
        self._format = "coco_bbox"
        self._init_coco_bbox(data)
    else:
        raise ValueError(
            f"Unsupported tracking JSON format in {self.json_path}. "
            "Expected COCO bbox (images+annotations) "
            "or video COCO (videos+annotations)."
        )

    self._required_format = required_format
    self._format_mismatch_msg: str | None = None
    if self._required_format is not None and self._format != self._required_format:
        self._format_mismatch_msg = (
            f"Tracking JSON format is '{self._format}', "
            f"but required_format is '{self._required_format}'."
        )

    self._cursor = 0
    logger.info(
        "[TrackingResultsReader] format={}, required_format={}, frames={}, path={}",
        self._format,
        self._required_format,
        len(self._frame_ids),
        self.json_path,
    )

    super().__init__(json_path=str(self.json_path), required_format=required_format, **kwargs)

num_frames property

num_frames

Return the number of frames addressable by this reader.

format property

format

Return the detected tracking JSON format identifier.

reset

reset()

Rewind sequential reads to the first available frame.

Source code in cuvis_ai/node/json_file.py
def reset(self) -> None:
    """Rewind sequential reads to the first available frame."""
    self._cursor = 0

forward

forward(frame_id=None, context=None, **_)

Emit tracking tensors for an explicit frame or the next cursor frame.

Source code in cuvis_ai/node/json_file.py
def forward(
    self,
    frame_id: torch.Tensor | None = None,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Emit tracking tensors for an explicit frame or the next cursor frame."""
    if self._format_mismatch_msg is not None:
        raise ValueError(self._format_mismatch_msg)

    if frame_id is not None:
        # Lookup mode: emit detections for the requested frame
        fid = int(frame_id.item())
    else:
        # Cursor mode (legacy): advance cursor sequentially
        if self._cursor >= len(self._frame_ids):
            raise StopIteration("No more frames in tracking JSON")
        fid = self._frame_ids[self._cursor]
        self._cursor += 1

    if self._format == "coco_bbox":
        return self._emit_coco_bbox(fid)
    else:
        return self._emit_video_coco(fid)

NumPy Readers

numpy_reader

Numpy-backed constant source node.

NpyReader

NpyReader(file_path, **kwargs)

Bases: Node

Load a .npy file once and return the same tensor every forward call.

Source code in cuvis_ai/node/numpy_reader.py
def __init__(self, file_path: str, **kwargs: Any) -> None:
    self.file_path = str(Path(file_path))
    path = Path(self.file_path)
    if not path.exists():
        raise FileNotFoundError(f"NpyReader input file not found: {path}")

    raw = np.load(path, allow_pickle=False)
    padded = _pad_to_bhwc4(np.asarray(raw, dtype=np.float32))
    tensor = torch.from_numpy(np.ascontiguousarray(padded))

    super().__init__(file_path=self.file_path, **kwargs)
    self.register_buffer("_data_buf", tensor, persistent=True)
forward
forward(frame_id=None, **_)

Return cached tensor.

Source code in cuvis_ai/node/numpy_reader.py
@torch.no_grad()
def forward(
    self,
    frame_id: torch.Tensor | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Return cached tensor."""
    return {"data": self._data_buf}

Video Datasets And Frame Sources

video

Video utilities: frame iteration, datasets, Lightning DataModule, and export nodes.

ToVideoNode

ToVideoNode(
    output_video_path,
    frame_rate=10.0,
    frame_rotation=None,
    codec="mp4v",
    overlay_title=None,
    **kwargs,
)

Bases: Node

Write incoming RGB frames directly to a video file.

This node opens a single OpenCV VideoWriter and appends frames on each forward call. It is intended for streaming pipelines where frames arrive incrementally.

Parameters:

Name Type Description Default
output_video_path str

Output path for the generated video file (for example .mp4).

required
frame_rate float

Video frame rate in frames per second. Must be positive. Default is 10.0.

10.0
frame_rotation int | None

Optional frame rotation in degrees. Supported values are -90, 90, 180 (and aliases 270, -270, -180). Positive values rotate anticlockwise (counterclockwise), negative values rotate clockwise. Default is None (no rotation).

None
codec str

FourCC codec string (length 4). Default is "mp4v".

'mp4v'
overlay_title str | None

Optional static title rendered at the top center with its own slim darkened background block. Default is None.

None
Source code in cuvis_ai/node/video.py
def __init__(
    self,
    output_video_path: str,
    frame_rate: float = 10.0,
    frame_rotation: int | None = None,
    codec: str = "mp4v",
    overlay_title: str | None = None,
    **kwargs: Any,
) -> None:
    if frame_rate <= 0:
        raise ValueError("frame_rate must be > 0")
    if len(codec) != 4:
        raise ValueError("codec must be a 4-character FourCC string")
    valid_rotations = {None, 0, 90, -90, 180, -180, 270, -270}
    if frame_rotation not in valid_rotations:
        raise ValueError(
            "frame_rotation must be one of: None, 0, 90, -90, 180, -180, 270, -270"
        )

    self.output_video_path = Path(output_video_path)
    self.frame_rate = float(frame_rate)
    self.frame_rotation = self._normalize_rotation(frame_rotation)
    self.codec = codec
    self.overlay_title = (
        None
        if overlay_title is None or not str(overlay_title).strip()
        else str(overlay_title).strip()
    )
    self._writer: cv2.VideoWriter | None = None
    self._frame_size: tuple[int, int] | None = None

    self.output_video_path.parent.mkdir(parents=True, exist_ok=True)

    super().__init__(
        output_video_path=output_video_path,
        frame_rate=frame_rate,
        frame_rotation=frame_rotation,
        codec=codec,
        overlay_title=self.overlay_title,
        **kwargs,
    )
forward
forward(rgb_image, frame_id=None, context=None, **_)

Append incoming RGB frames to the configured video file.

Source code in cuvis_ai/node/video.py
def forward(
    self,
    rgb_image: torch.Tensor,
    frame_id: torch.Tensor | None = None,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, str]:
    """Append incoming RGB frames to the configured video file."""
    rgb_u8 = self._to_uint8_batch(rgb_image)

    for b, frame in enumerate(rgb_u8):
        self._draw_title_overlay(frame)
        if frame_id is not None and b < len(frame_id):
            fid = int(frame_id[b].item())
            draw_text(frame, 8, 8, f"frame {fid}", (255, 255, 255), scale=2, bg=True)
        frame = self._rotate_frame(frame)
        height, width = int(frame.shape[0]), int(frame.shape[1])
        if self._writer is None:
            self._init_writer(height=height, width=width)
        elif self._frame_size != (height, width):
            raise ValueError(
                f"All frames must share one size. Expected {self._frame_size}, got {(height, width)}"
            )

        # RGB -> BGR for OpenCV writer
        bgr_frame = frame[..., [2, 1, 0]].numpy()
        self._writer.write(bgr_frame)

    return {"video_path": str(self.output_video_path)}
close
close()

Release the underlying video writer if it exists.

Source code in cuvis_ai/node/video.py
def close(self) -> None:
    """Release the underlying video writer if it exists."""
    if self._writer is not None:
        self._writer.release()
        self._writer = None

VideoFrameNode

Bases: Node

Passthrough source node that receives RGB frames from the batch.

forward
forward(rgb_image, frame_id=None, **_)

Pass through RGB frames and optional frame IDs from the batch.

Source code in cuvis_ai/node/video.py
def forward(
    self,
    rgb_image: torch.Tensor,
    frame_id: torch.Tensor | None = None,
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Pass through RGB frames and optional frame IDs from the batch."""
    result: dict[str, torch.Tensor] = {"rgb_image": rgb_image}
    if frame_id is not None:
        result["frame_id"] = frame_id
    return result