Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Output Nodes

Output nodes persist detections, tracks, features, and rendered video artifacts.

Tracking And Detection Writers

CocoTrackMaskWriter

CocoTrackMaskWriter(
    output_json_path,
    default_category_name="object",
    write_empty_frames=True,
    atomic_write=True,
    flush_interval=0,
    **kwargs,
)

Bases: _BaseCocoTrackWriter

Write mask tracking outputs into video_coco JSON.

Source code in cuvis_ai/node/json_file.py
def __init__(
    self,
    output_json_path: str,
    default_category_name: str = "object",
    write_empty_frames: bool = True,
    atomic_write: bool = True,
    flush_interval: int = 0,
    **kwargs: Any,
) -> None:
    if not default_category_name:
        raise ValueError("default_category_name must be a non-empty string.")

    self.default_category_name = default_category_name
    self.write_empty_frames = bool(write_empty_frames)
    self._frame_hw_by_id: dict[int, tuple[int, int]] = {}
    self._track_segmentations: dict[int, dict[int, dict[str, Any]]] = {}
    self._track_scores: dict[int, dict[int, float]] = {}
    self._track_bboxes: dict[int, dict[int, list[float]]] = {}
    self._track_areas: dict[int, dict[int, float]] = {}
    self._track_category_ids: dict[int, int] = {}
    self._category_id_to_name: dict[int, str] = {}

    super().__init__(
        output_json_path=output_json_path,
        atomic_write=atomic_write,
        flush_interval=flush_interval,
        default_category_name=default_category_name,
        write_empty_frames=write_empty_frames,
        **kwargs,
    )

forward

forward(
    frame_id,
    mask,
    object_ids,
    detection_scores,
    category_ids=None,
    category_semantics=None,
    context=None,
    **_,
)

Store one frame of tracked masks and metadata for later JSON export.

Source code in cuvis_ai/node/json_file.py
def forward(
    self,
    frame_id: torch.Tensor,
    mask: torch.Tensor,
    object_ids: torch.Tensor,
    detection_scores: torch.Tensor,
    category_ids: torch.Tensor | None = None,
    category_semantics: torch.Tensor | None = None,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Store one frame of tracked masks and metadata for later JSON export."""
    frame_idx = self._parse_frame_id(frame_id)
    mask_2d = self._parse_mask(mask)
    ids_1d = self._parse_vector(object_ids, port_name="object_ids")
    scores_1d = self._parse_vector(detection_scores, port_name="detection_scores")
    self._validate_alignment(ids_1d, scores_1d, "object_ids", "detection_scores")
    category_ids_1d: torch.Tensor | None = None
    if category_ids is not None:
        category_ids_1d = self._parse_vector(category_ids, port_name="category_ids")
        self._validate_alignment(ids_1d, category_ids_1d, "object_ids", "category_ids")
    self._update_category_semantics(category_semantics)

    frame_height = int(mask_2d.shape[0])
    frame_width = int(mask_2d.shape[1])

    # Replacing an existing frame should be idempotent.
    self._drop_frame(frame_idx)

    object_ids_list = ids_1d.to(dtype=torch.int64).cpu().tolist()
    detection_scores_list = scores_1d.to(dtype=torch.float32).cpu().tolist()
    category_ids_list = (
        category_ids_1d.to(dtype=torch.int64).cpu().tolist()
        if category_ids_1d is not None
        else [1] * len(object_ids_list)
    )
    score_by_obj_id: dict[int, float] = {
        int(obj_id): float(score)
        for obj_id, score in zip(object_ids_list, detection_scores_list, strict=False)
        if int(obj_id) > 0
    }
    category_by_obj_id: dict[int, int] = {}
    for obj_id, category_id in zip(object_ids_list, category_ids_list, strict=False):
        oid = int(obj_id)
        cid = int(category_id)
        if oid <= 0:
            continue
        if cid <= 0:
            raise ValueError("category_ids must be positive for tracked objects.")
        existing_category_id = self._track_category_ids.get(oid)
        if existing_category_id is not None and existing_category_id != cid:
            raise ValueError(
                f"Track {oid} received conflicting category IDs: "
                f"{existing_category_id} vs {cid}."
            )
        self._track_category_ids.setdefault(oid, cid)
        category_by_obj_id[oid] = cid
        fallback_name = self.default_category_name if cid == 1 else f"category_{cid}"
        self._category_id_to_name.setdefault(cid, fallback_name)
    present_obj_ids = {
        int(obj_id)
        for obj_id in mask_2d.to(dtype=torch.int64).unique().cpu().tolist()
        if int(obj_id) > 0
    }

    export_obj_ids: list[int] = []
    seen_obj_ids: set[int] = set()
    for obj_id in object_ids_list:
        oid = int(obj_id)
        if oid <= 0 or oid not in present_obj_ids or oid in seen_obj_ids:
            continue
        seen_obj_ids.add(oid)
        export_obj_ids.append(oid)

    if not export_obj_ids and not self.write_empty_frames:
        return {}

    self._frame_hw_by_id[frame_idx] = (frame_height, frame_width)

    for oid in export_obj_ids:
        obj_mask = mask_2d.eq(oid)
        if not bool(torch.any(obj_mask)):
            continue

        mask_np = obj_mask.to(dtype=torch.uint8).detach().cpu().numpy()
        rle_json = coco_rle_encode(mask_np)
        bbox = coco_rle_to_bbox(rle_json)
        area = coco_rle_area(rle_json)

        self._track_segmentations.setdefault(oid, {})[frame_idx] = rle_json
        self._track_scores.setdefault(oid, {})[frame_idx] = float(score_by_obj_id.get(oid, 0.0))
        self._track_bboxes.setdefault(oid, {})[frame_idx] = bbox
        self._track_areas.setdefault(oid, {})[frame_idx] = area
        if oid in category_by_obj_id:
            self._track_category_ids.setdefault(oid, category_by_obj_id[oid])

    self._mark_dirty_and_maybe_flush()
    return {}

CocoTrackBBoxWriter

CocoTrackBBoxWriter(
    output_json_path,
    category_id_to_name=None,
    write_empty_frames=True,
    atomic_write=True,
    flush_interval=0,
    **kwargs,
)

Bases: _BaseCocoTrackWriter

Write tracked bbox outputs into COCO tracking JSON.

Source code in cuvis_ai/node/json_file.py
def __init__(
    self,
    output_json_path: str,
    category_id_to_name: dict[int, str] | None = None,
    write_empty_frames: bool = True,
    atomic_write: bool = True,
    flush_interval: int = 0,
    **kwargs: Any,
) -> None:
    self.category_id_to_name: dict[int, str] = (
        dict(category_id_to_name) if category_id_to_name is not None else {0: "object"}
    )
    self.write_empty_frames = bool(write_empty_frames)
    self._frames_by_id: dict[int, dict[str, Any]] = {}

    super().__init__(
        output_json_path=output_json_path,
        atomic_write=atomic_write,
        flush_interval=flush_interval,
        category_id_to_name=self.category_id_to_name,
        write_empty_frames=write_empty_frames,
        **kwargs,
    )

forward

forward(
    frame_id,
    bboxes,
    category_ids,
    confidences,
    track_ids,
    orig_hw,
    context=None,
    **_,
)

Store one frame of tracked bounding boxes for later export.

Source code in cuvis_ai/node/json_file.py
def forward(
    self,
    frame_id: torch.Tensor,
    bboxes: torch.Tensor,
    category_ids: torch.Tensor,
    confidences: torch.Tensor,
    track_ids: torch.Tensor,
    orig_hw: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Store one frame of tracked bounding boxes for later export."""
    frame_idx = self._parse_frame_id(frame_id)
    ids_1d = self._parse_vector(category_ids, port_name="category_ids")
    scores_1d = self._parse_vector(confidences, port_name="confidences")
    track_ids_1d = self._parse_vector(track_ids, port_name="track_ids")
    self._validate_alignment(ids_1d, scores_1d, "category_ids", "confidences")
    self._validate_alignment(ids_1d, track_ids_1d, "category_ids", "track_ids")

    h, w = int(orig_hw[0, 0]), int(orig_hw[0, 1])
    boxes_2d = bboxes[0] if bboxes.ndim == 3 else bboxes

    n = int(ids_1d.numel())
    detections: list[dict[str, Any]] = []
    for i in range(n):
        x1, y1, x2, y2 = boxes_2d[i].cpu().tolist()
        bw = float(x2 - x1)
        bh = float(y2 - y1)
        detections.append(
            {
                "category_id": int(ids_1d[i].item()),
                "bbox": [float(x1), float(y1), bw, bh],
                "area": bw * bh,
                "score": float(scores_1d[i].item()),
                "track_id": int(track_ids_1d[i].item()),
            }
        )

    if not detections and not self.write_empty_frames:
        return {}

    self._frames_by_id[frame_idx] = {
        "frame_idx": frame_idx,
        "height": h,
        "width": w,
        "detections": detections,
    }
    self._mark_dirty_and_maybe_flush()
    return {}

DetectionCocoJsonNode

DetectionCocoJsonNode(
    output_json_path,
    category_id_to_name=None,
    write_empty_frames=True,
    atomic_write=True,
    flush_interval=0,
    **kwargs,
)

Bases: _BaseJsonWriterNode

Write frame-wise detections into COCO detection JSON.

Source code in cuvis_ai/node/json_file.py
def __init__(
    self,
    output_json_path: str,
    category_id_to_name: dict[int, str] | None = None,
    write_empty_frames: bool = True,
    atomic_write: bool = True,
    flush_interval: int = 0,
    **kwargs: Any,
) -> None:
    self.category_id_to_name: dict[int, str] = (
        dict(category_id_to_name) if category_id_to_name is not None else {0: "person"}
    )
    self.write_empty_frames = bool(write_empty_frames)
    self._frames_by_id: dict[int, dict[str, Any]] = {}

    super().__init__(
        output_json_path=output_json_path,
        atomic_write=atomic_write,
        flush_interval=flush_interval,
        category_id_to_name=self.category_id_to_name,
        write_empty_frames=write_empty_frames,
        **kwargs,
    )

forward

forward(
    frame_id,
    bboxes,
    category_ids,
    confidences,
    orig_hw,
    context=None,
    **_,
)

Store one frame of detections for COCO JSON serialization.

Source code in cuvis_ai/node/json_file.py
def forward(
    self,
    frame_id: torch.Tensor,
    bboxes: torch.Tensor,
    category_ids: torch.Tensor,
    confidences: torch.Tensor,
    orig_hw: torch.Tensor,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, Any]:
    """Store one frame of detections for COCO JSON serialization."""
    frame_idx = _BaseCocoTrackWriter._parse_frame_id(frame_id)
    ids_1d = _BaseCocoTrackWriter._parse_vector(category_ids, port_name="category_ids")
    scores_1d = _BaseCocoTrackWriter._parse_vector(confidences, port_name="confidences")
    _BaseCocoTrackWriter._validate_alignment(ids_1d, scores_1d, "category_ids", "confidences")

    h, w = int(orig_hw[0, 0]), int(orig_hw[0, 1])
    boxes_2d = bboxes[0] if bboxes.ndim == 3 else bboxes

    n = int(ids_1d.numel())
    detections: list[dict[str, Any]] = []
    for i in range(n):
        x1, y1, x2, y2 = boxes_2d[i].cpu().tolist()
        bw = float(x2 - x1)
        bh = float(y2 - y1)
        detections.append(
            {
                "category_id": int(ids_1d[i].item()),
                "bbox": [float(x1), float(y1), bw, bh],
                "area": bw * bh,
                "score": float(scores_1d[i].item()),
            }
        )

    if not detections and not self.write_empty_frames:
        return {}

    self._frames_by_id[frame_idx] = {
        "frame_idx": frame_idx,
        "height": h,
        "width": w,
        "detections": detections,
    }
    self._mark_dirty_and_maybe_flush()
    return {}

NumPy Feature Writers

numpy_writer

Per-frame numpy feature writer node.

NumpyFeatureWriterNode

NumpyFeatureWriterNode(
    output_dir, prefix="features", **kwargs
)

Bases: Node

Save per-frame feature tensors to .npy files.

Writes one .npy file per frame, named {prefix}_{frame_id:06d}.npy. Useful for offline analysis, clustering, or evaluation of ReID embeddings.

Parameters:

Name Type Description Default
output_dir str

Directory to write .npy files into.

required
prefix str

Filename prefix (default "features").

'features'
Source code in cuvis_ai/node/numpy_writer.py
def __init__(
    self,
    output_dir: str,
    prefix: str = "features",
    **kwargs: Any,
) -> None:
    self.output_dir = str(output_dir)
    self.prefix = str(prefix)
    self._dir_created = False
    super().__init__(output_dir=self.output_dir, prefix=self.prefix, **kwargs)
forward
forward(features, frame_id, **_)

Write features to a .npy file.

Parameters:

Name Type Description Default
features Tensor

[B, N, D] float32. Batch dimension is squeezed before saving.

required
frame_id Tensor

(1,) int64 scalar frame index.

required

Returns:

Type Description
dict

Empty dict (sink node).

Source code in cuvis_ai/node/numpy_writer.py
@torch.no_grad()
def forward(self, features: Tensor, frame_id: Tensor, **_: Any) -> dict[str, Tensor]:
    """Write features to a ``.npy`` file.

    Parameters
    ----------
    features : Tensor
        ``[B, N, D]`` float32. Batch dimension is squeezed before saving.
    frame_id : Tensor
        ``(1,)`` int64 scalar frame index.

    Returns
    -------
    dict
        Empty dict (sink node).
    """
    out_dir = Path(self.output_dir)
    if not self._dir_created:
        out_dir.mkdir(parents=True, exist_ok=True)
        self._dir_created = True

    fid = int(frame_id.item())
    # Squeeze batch dim: [B, N, D] → [N, D]
    array = features.squeeze(0).cpu().numpy()
    np.save(out_dir / f"{self.prefix}_{fid:06d}.npy", array)

    return {}

Video Outputs

video

Video utilities: frame iteration, datasets, Lightning DataModule, and export nodes.

ToVideoNode

ToVideoNode(
    output_video_path,
    frame_rate=10.0,
    frame_rotation=None,
    codec="mp4v",
    overlay_title=None,
    **kwargs,
)

Bases: Node

Write incoming RGB frames directly to a video file.

This node opens a single OpenCV VideoWriter and appends frames on each forward call. It is intended for streaming pipelines where frames arrive incrementally.

Parameters:

Name Type Description Default
output_video_path str

Output path for the generated video file (for example .mp4).

required
frame_rate float

Video frame rate in frames per second. Must be positive. Default is 10.0.

10.0
frame_rotation int | None

Optional frame rotation in degrees. Supported values are -90, 90, 180 (and aliases 270, -270, -180). Positive values rotate anticlockwise (counterclockwise), negative values rotate clockwise. Default is None (no rotation).

None
codec str

FourCC codec string (length 4). Default is "mp4v".

'mp4v'
overlay_title str | None

Optional static title rendered at the top center with its own slim darkened background block. Default is None.

None
Source code in cuvis_ai/node/video.py
def __init__(
    self,
    output_video_path: str,
    frame_rate: float = 10.0,
    frame_rotation: int | None = None,
    codec: str = "mp4v",
    overlay_title: str | None = None,
    **kwargs: Any,
) -> None:
    if frame_rate <= 0:
        raise ValueError("frame_rate must be > 0")
    if len(codec) != 4:
        raise ValueError("codec must be a 4-character FourCC string")
    valid_rotations = {None, 0, 90, -90, 180, -180, 270, -270}
    if frame_rotation not in valid_rotations:
        raise ValueError(
            "frame_rotation must be one of: None, 0, 90, -90, 180, -180, 270, -270"
        )

    self.output_video_path = Path(output_video_path)
    self.frame_rate = float(frame_rate)
    self.frame_rotation = self._normalize_rotation(frame_rotation)
    self.codec = codec
    self.overlay_title = (
        None
        if overlay_title is None or not str(overlay_title).strip()
        else str(overlay_title).strip()
    )
    self._writer: cv2.VideoWriter | None = None
    self._frame_size: tuple[int, int] | None = None

    self.output_video_path.parent.mkdir(parents=True, exist_ok=True)

    super().__init__(
        output_video_path=output_video_path,
        frame_rate=frame_rate,
        frame_rotation=frame_rotation,
        codec=codec,
        overlay_title=self.overlay_title,
        **kwargs,
    )
forward
forward(rgb_image, frame_id=None, context=None, **_)

Append incoming RGB frames to the configured video file.

Source code in cuvis_ai/node/video.py
def forward(
    self,
    rgb_image: torch.Tensor,
    frame_id: torch.Tensor | None = None,
    context: Context | None = None,  # noqa: ARG002
    **_: Any,
) -> dict[str, str]:
    """Append incoming RGB frames to the configured video file."""
    rgb_u8 = self._to_uint8_batch(rgb_image)

    for b, frame in enumerate(rgb_u8):
        self._draw_title_overlay(frame)
        if frame_id is not None and b < len(frame_id):
            fid = int(frame_id[b].item())
            draw_text(frame, 8, 8, f"frame {fid}", (255, 255, 255), scale=2, bg=True)
        frame = self._rotate_frame(frame)
        height, width = int(frame.shape[0]), int(frame.shape[1])
        if self._writer is None:
            self._init_writer(height=height, width=width)
        elif self._frame_size != (height, width):
            raise ValueError(
                f"All frames must share one size. Expected {self._frame_size}, got {(height, width)}"
            )

        # RGB -> BGR for OpenCV writer
        bgr_frame = frame[..., [2, 1, 0]].numpy()
        self._writer.write(bgr_frame)

    return {"video_path": str(self.output_video_path)}
close
close()

Release the underlying video writer if it exists.

Source code in cuvis_ai/node/video.py
def close(self) -> None:
    """Release the underlying video writer if it exists."""
    if self._writer is not None:
        self._writer.release()
        self._writer = None

VideoFrameNode

Bases: Node

Passthrough source node that receives RGB frames from the batch.

forward
forward(rgb_image, frame_id=None, **_)

Pass through RGB frames and optional frame IDs from the batch.

Source code in cuvis_ai/node/video.py
def forward(
    self,
    rgb_image: torch.Tensor,
    frame_id: torch.Tensor | None = None,
    **_: Any,
) -> dict[str, torch.Tensor]:
    """Pass through RGB frames and optional frame IDs from the batch."""
    result: dict[str, torch.Tensor] = {"rgb_image": rgb_image}
    if frame_id is not None:
        result["frame_id"] = frame_id
    return result

Common Tracking Sink Patterns

YOLO / tracker -> DetectionCocoJsonNode or CocoTrackBBoxWriter -> JSON output
SAM3 / mask tracker -> CocoTrackMaskWriter -> tracking JSON output
RGB or overlays -> ToVideoNode -> MP4 output
DeepEIOU embeddings -> NumpyFeatureWriterNode -> .npy output