Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


Ports API Reference

Complete API reference for the Typed I/O port system in CUVIS.AI.

Overview

The port system provides typed input/output interfaces for all nodes, enabling type-safe connections, runtime validation, and flexible pipeline construction. Each node defines its input and output ports using PortSpec objects.

Core Components

PortSpec

The PortSpec class defines the specification for a port, including its type, shape constraints, and metadata.

Attributes: - name: Port identifier - port_type: "input" or "output" - shape: Expected tensor shape with dimension constraints - dtype: Expected data type (optional) - description: Human-readable description - stage: Execution stage ("train", "eval", "both")

Example:

from cuvis_ai_schemas.pipeline import PortSpec

# Define an input port for hyperspectral data
data_port = PortSpec(
    name="data",
    port_type="input",
    shape=(-1, -1, -1, -1),  # (batch, height, width, channels)
    description="Raw hyperspectral cube input"
)

# Define an output port for normalized data
normalized_port = PortSpec(
    name="normalized", 
    port_type="output",
    shape=(-1, -1, -1, -1),
    description="Normalized hyperspectral cube"
)

InputPort / OutputPort

Port instances that are attached to nodes and used for connections.

Creating Ports:

from cuvis_ai_schemas.pipeline import InputPort, OutputPort

# Create port instances
input_port = InputPort(spec=data_port, node=normalizer)
output_port = OutputPort(spec=normalized_port, node=normalizer)

Port Compatibility Rules

Ports can be connected if they satisfy compatibility rules:

Shape Compatibility

  • Fixed dimensions must match exactly
  • Variable dimensions (-1) can match any size
  • Batch dimensions are typically variable

Type Compatibility

  • Input ports can only connect to output ports
  • Ports must have compatible data types
  • Stage constraints must be satisfied

Connection Validation

# Check if ports are compatible
if input_port.is_compatible_with(output_port):
    pipeline.connect(output_port, input_port)
else:
    print("Ports are incompatible")

Node Port Declarations

Nodes declare their ports using INPUT_SPECS and OUTPUT_SPECS class attributes.

Example Node Implementation

from cuvis_ai_core.node.node import Node
from cuvis_ai_schemas.pipeline import PortSpec

class MinMaxNormalizer(Node):
    """Min-max normalization node."""

    # Input port specifications
    INPUT_SPECS = [
        PortSpec(
            name="data",
            port_type="input",
            shape=(-1, -1, -1, -1),
            description="Raw hyperspectral cube"
        )
    ]

    # Output port specifications  
    OUTPUT_SPECS = [
        PortSpec(
            name="normalized",
            port_type="output", 
            shape=(-1, -1, -1, -1),
            description="Normalized cube [0, 1]"
        )
    ]

    def __init__(self, eps=1e-6, use_running_stats=True):
        super().__init__()
        self.eps = eps
        self.use_running_stats = use_running_stats

    def forward(self, **inputs):
        data = inputs["data"]
        # Normalization logic here
        normalized = (data - self.running_min) / (self.running_max - self.running_min + self.eps)
        return {"normalized": normalized}

Port-Based Connections

Basic Connection

# Connect two nodes using their ports
pipeline.connect(normalizer.normalized, selector.data)

Multiple Connections

# Fan-in multiple outputs to a single input (e.g., monitoring artifacts)
pipeline.connect(
    (viz_mask.artifacts, tensorboard_node.artifacts),
    (viz_rgb.artifacts, tensorboard_node.artifacts),
)

Stage-Aware Connections

# Connect nodes for specific execution stages
pipeline.connect(normalizer.normalized, selector.data, stage="train")
pipeline.connect(selector.selected, pca.features, stage="both")

Loss Nodes Without an Aggregator

LossAggregator has been removed—the trainer now collects individual loss nodes directly. Register every loss/regularizer node with the GradientTrainer (or any custom trainer) and feed their inputs through standard port connections, as shown in examples//03_channel_selector.py.

pipeline.connect(
    (logit_head.logits, bce_loss.predictions),
    (data_node.mask, bce_loss.targets),
    (selector.weights, entropy_loss.weights),
    (selector.weights, diversity_loss.weights),
)

grad_trainer = GradientTrainer(
    pipeline=pipeline,
    datamodule=datamodule,
    loss_nodes=[bce_loss, entropy_loss, diversity_loss],
    metric_nodes=[metrics_anomaly],
    trainer_config=training_cfg.trainer,
    optimizer_config=training_cfg.optimizer,
)

Batch Distribution

The port system enables explicit batch distribution to specific input ports.

Single Input

# Distribute batch to a specific input port
outputs = pipeline.forward(batch={f"{normalizer.id}.data": input_data})

Multiple Inputs

# Distribute different data to different input ports
outputs = pipeline.forward(batch={
    f"{node1.id}.data1": data1,
    f"{node2.id}.data2": data2,
    f"{node3.id}.features": features
})

Batch Key Format

Batch keys follow the pattern: {node_id}.{port_name}

Dimension Resolution

The port system automatically resolves variable dimensions during execution.

Dynamic Shape Resolution

# Port with variable dimensions
port_spec = PortSpec(
    name="features",
    port_type="input", 
    shape=(-1, -1, -1, -1)  # All dimensions variable
)

# During execution, dimensions are resolved from input data
# Input shape: (32, 64, 64, 100) → Output shape: (32, 64, 64, n_components)

Constraint Validation

# Port with fixed channel dimension
port_spec = PortSpec(
    name="features",
    port_type="input",
    shape=(-1, -1, -1, 100)  # Fixed channel dimension
)

# Connection will fail if channel dimension doesn't match

Common Port Patterns

Normalization Nodes

Input Ports: - data: Raw hyperspectral cube

Output Ports: - normalized: Normalized data

Feature Extraction

Input Ports: - features: Input features for transformation

Output Ports: - projected: Transformed features - explained_variance: Statistical metrics

Anomaly Detection

Input Ports: - data: Features for anomaly scoring

Output Ports: - scores: Anomaly detection scores - logits: Logit-transformed scores

Loss Nodes

Input Ports: - Variadic inputs for loss computation

Output Ports: - loss: Computed loss value

Error Handling

Port Not Found

try:
    pipeline.connect(normalizer.nonexistent, selector.data)
except AttributeError as e:
    print(f"Port error: {e}")
    # Error: 'MinMaxNormalizer' object has no attribute 'nonexistent'

Incompatible Ports

try:
    pipeline.connect(normalizer.normalized, pca.features)
except ValueError as e:
    print(f"Compatibility error: {e}")
    # Error: Port shapes are incompatible: (-1, -1, -1, -1) vs (-1, -1, -1, 3)

Missing Batch Distribution

try:
    outputs = pipeline.forward(batch=input_data)
except KeyError as e:
    print(f"Batch error: {e}")
    # Error: Unable to find input port for batch key

Advanced Usage

Custom Port Specifications

# Create custom port with specific constraints
custom_port = PortSpec(
    name="embedding",
    port_type="output",
    shape=(-1, 512),  # Fixed embedding dimension
    dtype=torch.float32,
    description="Feature embeddings"
)

Port Inspection

# Inspect port properties
port = normalizer.normalized
print(f"Port name: {port.name}")
print(f"Port type: {port.port_type}")
print(f"Expected shape: {port.shape}")
print(f"Description: {port.description}")

Connection Graph

# Get all connections in the pipeline
connections = pipeline.get_connections()
for source, target in connections:
    print(f"{source.node.name}.{source.name}{target.node.name}.{target.name}")

Best Practices

  1. Use Descriptive Port Names: Choose names that clearly indicate the port's purpose
  2. Define Shape Constraints: Use fixed dimensions when possible for early error detection
  3. Document Ports: Provide clear descriptions for each port
  4. Test Port Compatibility: Validate connections during development
  5. Use Stage Filtering: Leverage stage-aware execution for performance

API Reference

pipeline

Pipeline structure schemas.

ConnectionConfig

Bases: _BaseConfig

Connection between two nodes.

Attributes:

Name Type Description
from_node str

Source node ID

from_port str

Source port name

to_node str

Target node ID

to_port str

Target port name

NodeConfig

Bases: _BaseConfig

Node configuration within a pipeline.

Attributes:

Name Type Description
id str

Unique node identifier

class_name str

Fully-qualified class name (e.g., 'my_package.MyNode') Alias: 'class' for backward compatibility

params dict[str, Any]

Node parameters/hyperparameters Alias: 'hparams' for backward compatibility

PipelineConfig

Bases: _BaseConfig

Pipeline structure configuration.

Attributes:

Name Type Description
name str

Pipeline name

nodes list[NodeConfig] | list[dict[str, Any]]

Node definitions (can be NodeConfig or dict for flexibility)

connections list[ConnectionConfig] | list[dict[str, Any]]

Node connections (can be ConnectionConfig or dict for flexibility)

frozen_nodes list[str]

Node IDs to keep frozen during training

metadata PipelineMetadata | None

Optional pipeline metadata

to_proto

to_proto()

Convert to proto message.

Requires cuvis-ai-schemas[proto] to be installed.

Returns:

Type Description
PipelineConfig

Proto message representation

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def to_proto(self) -> cuvis_ai_pb2.PipelineConfig:
    """Convert to proto message.

    Requires cuvis-ai-schemas[proto] to be installed.

    Returns
    -------
    cuvis_ai_pb2.PipelineConfig
        Proto message representation
    """
    try:
        from cuvis_ai_schemas.grpc.v1 import cuvis_ai_pb2
    except ImportError as exc:
        msg = "Proto support not installed. Install with: pip install cuvis-ai-schemas[proto]"
        raise ImportError(msg) from exc

    return cuvis_ai_pb2.PipelineConfig(config_bytes=self.model_dump_json().encode("utf-8"))

from_proto classmethod

from_proto(proto_config)

Load from proto message.

Parameters:

Name Type Description Default
proto_config PipelineConfig

Proto message to deserialize

required

Returns:

Type Description
PipelineConfig

Loaded configuration

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
@classmethod
def from_proto(cls, proto_config: cuvis_ai_pb2.PipelineConfig) -> PipelineConfig:
    """Load from proto message.

    Parameters
    ----------
    proto_config : cuvis_ai_pb2.PipelineConfig
        Proto message to deserialize

    Returns
    -------
    PipelineConfig
        Loaded configuration
    """
    return cls.model_validate_json(proto_config.config_bytes.decode("utf-8"))

to_json

to_json()

Convert to JSON string.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def to_json(self) -> str:
    """Convert to JSON string."""
    return self.model_dump_json()

from_json classmethod

from_json(payload)

Load from JSON string.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
@classmethod
def from_json(cls, payload: str) -> PipelineConfig:
    """Load from JSON string."""
    return cls.model_validate_json(payload)

to_dict

to_dict()

Convert to dictionary.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return self.model_dump()

from_dict classmethod

from_dict(data)

Load from dictionary.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PipelineConfig:
    """Load from dictionary."""
    return cls.model_validate(data)

save_to_file

save_to_file(path)

Save pipeline configuration to YAML file.

Parameters:

Name Type Description Default
path str | Path

Output file path

required
Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def save_to_file(self, path: str | Path) -> None:
    """Save pipeline configuration to YAML file.

    Parameters
    ----------
    path : str | Path
        Output file path
    """
    from pathlib import Path

    output_path = Path(path)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    with output_path.open("w", encoding="utf-8") as f:
        yaml.safe_dump(self.model_dump(), f, sort_keys=False)

load_from_file classmethod

load_from_file(path)

Load pipeline configuration from YAML file.

Parameters:

Name Type Description Default
path str | Path

Input file path

required

Returns:

Type Description
PipelineConfig

Loaded configuration

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
@classmethod
def load_from_file(cls, path: str | Path) -> PipelineConfig:
    """Load pipeline configuration from YAML file.

    Parameters
    ----------
    path : str | Path
        Input file path

    Returns
    -------
    PipelineConfig
        Loaded configuration
    """
    from pathlib import Path

    with Path(path).open("r", encoding="utf-8") as f:
        data = yaml.safe_load(f)
    return cls.from_dict(data)

PipelineMetadata

Bases: _BaseConfig

Pipeline metadata for documentation and discovery.

Attributes:

Name Type Description
name str

Pipeline name

description str

Human-readable description

created str

Creation timestamp (ISO format)

tags list[str]

Tags for categorization and search

author str

Author name or email

cuvis_ai_version str

Version of cuvis-ai-schemas used

to_dict

to_dict()

Convert to dictionary.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def to_dict(self) -> dict[str, Any]:
    """Convert to dictionary."""
    return self.model_dump()

from_dict classmethod

from_dict(data)

Load from dictionary.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PipelineMetadata:
    """Load from dictionary."""
    return cls.model_validate(data)

to_proto

to_proto()

Convert to proto message.

Requires cuvis-ai-schemas[proto] to be installed.

Returns:

Type Description
PipelineMetadata

Proto message representation

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/config.py
def to_proto(self) -> cuvis_ai_pb2.PipelineMetadata:
    """Convert to proto message.

    Requires cuvis-ai-schemas[proto] to be installed.

    Returns
    -------
    cuvis_ai_pb2.PipelineMetadata
        Proto message representation
    """
    try:
        from cuvis_ai_schemas.grpc.v1 import cuvis_ai_pb2
    except ImportError as exc:
        msg = "Proto support not installed. Install with: pip install cuvis-ai-schemas[proto]"
        raise ImportError(msg) from exc

    return cuvis_ai_pb2.PipelineMetadata(
        name=self.name,
        description=self.description,
        created=self.created,
        tags=list(self.tags),
        author=self.author,
        cuvis_ai_version=self.cuvis_ai_version,
    )

PortCompatibilityError

Bases: Exception

Raised when attempting to connect incompatible ports.

DimensionResolver

Utility class for resolving symbolic dimensions in port shapes.

resolve staticmethod

resolve(shape, node)

Resolve symbolic dimensions to concrete values.

Parameters:

Name Type Description Default
shape tuple[int | str, ...]

Shape specification with flexible (-1), fixed (int), or symbolic (str) dims.

required
node Any | None

Node instance to resolve symbolic dimensions from.

required

Returns:

Type Description
tuple[int, ...]

Resolved shape with concrete integer values.

Raises:

Type Description
AttributeError

If symbolic dimension references non-existent node attribute.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/ports.py
@staticmethod
def resolve(
    shape: tuple[int | str, ...],
    node: Any | None,
) -> tuple[int, ...]:
    """Resolve symbolic dimensions to concrete values.

    Parameters
    ----------
    shape : tuple[int | str, ...]
        Shape specification with flexible (-1), fixed (int), or symbolic (str) dims.
    node : Any | None
        Node instance to resolve symbolic dimensions from.

    Returns
    -------
    tuple[int, ...]
        Resolved shape with concrete integer values.

    Raises
    ------
    AttributeError
        If symbolic dimension references non-existent node attribute.
    """
    resolved: list[int] = []
    for dim in shape:
        if isinstance(dim, int):
            # Flexible (-1) or fixed (int) dimension
            resolved.append(dim)
            continue

        if isinstance(dim, str):
            # Symbolic dimension - resolve from node
            if node is None:
                raise ValueError(
                    f"Cannot resolve symbolic dimension '{dim}' without node instance"
                )
            if not hasattr(node, dim):
                node_label = getattr(node, "id", None) or node
                raise AttributeError(
                    f"Node {node_label} has no attribute '{dim}' for dimension resolution"
                )

            value = getattr(node, dim)
            if not isinstance(value, int):
                raise TypeError(f"Dimension '{dim}' resolved to {type(value)}, expected int")
            resolved.append(value)
            continue

        raise TypeError(f"Invalid dimension type: {type(dim)}")

    return tuple(resolved)

InputPort

InputPort(node, name, spec)

Proxy object representing a node's input port.

Initialize an input port proxy.

Parameters:

Name Type Description Default
node Any

The node instance that owns this port.

required
name str

The name of the port on the node.

required
spec PortSpec

The port specification defining type and shape constraints.

required
Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/ports.py
def __init__(self, node: Any, name: str, spec: PortSpec) -> None:
    """Initialize an input port proxy.

    Parameters
    ----------
    node : Any
        The node instance that owns this port.
    name : str
        The name of the port on the node.
    spec : PortSpec
        The port specification defining type and shape constraints.
    """
    self.node = node
    self.name = name
    self.spec = spec

OutputPort

OutputPort(node, name, spec)

Proxy object representing a node's output port.

Initialize an output port proxy.

Parameters:

Name Type Description Default
node Any

The node instance that owns this port.

required
name str

The name of the port on the node.

required
spec PortSpec

The port specification defining type and shape constraints.

required
Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/ports.py
def __init__(self, node: Any, name: str, spec: PortSpec) -> None:
    """Initialize an output port proxy.

    Parameters
    ----------
    node : Any
        The node instance that owns this port.
    name : str
        The name of the port on the node.
    spec : PortSpec
        The port specification defining type and shape constraints.
    """
    self.node = node
    self.name = name
    self.spec = spec

PortSpec dataclass

PortSpec(dtype, shape, description='', optional=False)

Specification for a node input or output port.

resolve_shape

resolve_shape(node)

Resolve symbolic dimensions in shape using node attributes.

Parameters:

Name Type Description Default
node Any

Node instance to resolve symbolic dimensions from.

required

Returns:

Type Description
tuple[int, ...]

Resolved shape with all symbolic dimensions replaced by concrete integer values.

See Also

DimensionResolver.resolve : Underlying resolution logic.

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/ports.py
def resolve_shape(self, node: Any) -> tuple[int, ...]:
    """Resolve symbolic dimensions in shape using node attributes.

    Parameters
    ----------
    node : Any
        Node instance to resolve symbolic dimensions from.

    Returns
    -------
    tuple[int, ...]
        Resolved shape with all symbolic dimensions replaced by concrete integer values.

    See Also
    --------
    DimensionResolver.resolve : Underlying resolution logic.
    """
    return DimensionResolver.resolve(self.shape, node)

is_compatible_with

is_compatible_with(other, source_node, target_node)

Check if this port can connect to another port.

Parameters:

Name Type Description Default
other PortSpec | list[PortSpec]

Target port spec. If a list, it's a variadic port - extract the spec.

required
source_node Any | None

Source node for dimension resolution

required
target_node Any | None

Target node for dimension resolution

required

Returns:

Type Description
tuple[bool, str]

(is_compatible, error_message)

Source code in .venv/lib/python3.11/site-packages/cuvis_ai_schemas/pipeline/ports.py
def is_compatible_with(
    self,
    other: PortSpec | list[PortSpec],
    source_node: Any | None,
    target_node: Any | None,
) -> tuple[bool, str]:
    """Check if this port can connect to another port.

    Parameters
    ----------
    other : PortSpec | list[PortSpec]
        Target port spec. If a list, it's a variadic port - extract the spec.
    source_node : Any | None
        Source node for dimension resolution
    target_node : Any | None
        Target node for dimension resolution

    Returns
    -------
    tuple[bool, str]
        (is_compatible, error_message)
    """

    def _format_dtype(value: Any) -> str:
        """Format a dtype value for display in error messages.

        Parameters
        ----------
        value : Any
            A dtype value (torch.dtype, type, or other).

        Returns
        -------
        str
            Human-readable string representation of the dtype.
        """
        if isinstance(value, torch.dtype):
            return str(value)
        return getattr(value, "__name__", str(value))

    def _is_tensor_related(dtype: Any) -> bool:
        """Check if dtype is torch.Tensor or a specific torch.dtype.

        Parameters
        ----------
        dtype : Any
            The dtype to check.

        Returns
        -------
        bool
            True if dtype is torch.Tensor or a torch.dtype instance.
        """
        return dtype is torch.Tensor or isinstance(dtype, torch.dtype)

    # Handle variadic ports (list-based specs)
    if isinstance(other, list):
        if not other:
            return False, "Variadic port has empty spec list"
        # Extract the actual PortSpec from the list
        other = other[0]

    # Check dtype compatibility with smart tensor handling
    source_is_tensor = _is_tensor_related(self.dtype)
    target_is_tensor = _is_tensor_related(other.dtype)

    if source_is_tensor and target_is_tensor:
        # Both tensor-related types
        # Allow if either is generic torch.Tensor OR both are same dtype
        if not (
            self.dtype is torch.Tensor
            or other.dtype is torch.Tensor
            or self.dtype == other.dtype
        ):
            return False, (
                f"Dtype mismatch: source has {_format_dtype(self.dtype)}, "
                f"target expects {_format_dtype(other.dtype)}"
            )
    elif self.dtype != other.dtype:
        # Non-tensor types must match exactly
        return False, (
            f"Dtype mismatch: source has {_format_dtype(self.dtype)}, "
            f"target expects {_format_dtype(other.dtype)}"
        )

    # Resolve shapes
    try:
        source_shape = self.resolve_shape(source_node) if source_node else self.shape
        target_shape = other.resolve_shape(target_node) if target_node else other.shape
    except (AttributeError, ValueError, TypeError) as exc:
        return False, f"Shape resolution failed: {exc}"

    # Check rank compatibility
    if len(source_shape) != len(target_shape):
        return False, (
            f"Shape rank mismatch: source has {len(source_shape)} dimensions, "
            f"target expects {len(target_shape)}"
        )

    # Check dimension-by-dimension compatibility
    for idx, (src_dim, tgt_dim) in enumerate(zip(source_shape, target_shape, strict=True)):
        # -1 means flexible, always compatible
        if src_dim == -1 or tgt_dim == -1:
            continue

        # Both fixed - must match exactly
        if src_dim != tgt_dim:
            return False, (
                f"Dimension {idx} mismatch: source has size {src_dim}, target expects {tgt_dim}"
            )

    return True, ""

See Also