Status: Needs Review
This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.
Port System Deep Dive¶
Typed, validated connections between nodes enable robust data flow in Cuvis.AI pipelines.
Ports provide explicit communication interfaces with type safety, shape validation, optional connections, and symbolic dimension resolution.
Key capabilities:
- Typed I/O with dtype and shape specifications
- Automatic connection compatibility validation
- Support for optional and variadic ports
- Symbolic dimensions resolved from node attributes
- Data flow patterns (fan-in, fan-out, multi-port)
Port Architecture¶
PortSpec Definition¶
Dataclass specifying the contract for a port.
from dataclasses import dataclass
@dataclass
class PortSpec:
dtype: Any # Data type (torch.float32, etc.)
shape: tuple[int | str, ...] # Shape with -1 for flexible, strings for symbolic
description: str = "" # Documentation
optional: bool = False # Connection required?
Port Types¶
InputPort: Receives data into a node
node.inputs.data # InputPort(node, "data", spec)
pipeline.connect(source_port, target_node.inputs.data)
OutputPort: Emits data from a node
node.outputs.scores # OutputPort(node, "scores", spec)
pipeline.connect(source_node.outputs.scores, target_port)
Shape Specifications¶
Three dimension types:
Flexible (-1): Accept any size
Fixed (integers): Require exact size
Symbolic (strings): Resolve from node attributes
Connection Validation¶
flowchart LR
A[Source Port] -->|dtype check| B{Compatible?}
B -->|No| C[PortCompatibilityError]
B -->|Yes| D[Resolve symbolic dims]
D -->|shape check| E{Compatible?}
E -->|No| C
E -->|Yes| F[Connection Valid]
classDef success fill:#d4edda,stroke:#28a745,color:#155724
classDef error fill:#f8d7da,stroke:#dc3545,color:#721c24
class F success
class C error
Compatibility Rules:
| Source | Target | Compatible? |
|---|---|---|
torch.float32 |
torch.float32 |
✅ Yes |
torch.float32 |
torch.float64 |
❌ No |
torch.Tensor |
torch.float32 |
✅ Yes |
(-1, -1, 61) |
(-1, -1, 61) |
✅ Yes |
(-1, -1, 61) |
(-1, -1, -1) |
✅ Yes (flexible target) |
(-1, -1, 61) |
(-1, -1, 30) |
❌ No (fixed mismatch) |
Connection API¶
# Single connection
pipeline.connect((source_node.result, target_node.data))
# Batch connections
pipeline.connect(
(node1.data, node2.data),
(node2.result, node3.features),
(node3.scores, node4.predictions)
)
# Nodes auto-added if not present
pipeline.connect((new_node1.data, new_node2.data))
Simplified Syntax¶
# Explicit (always works)
pipeline.connect((normalizer.outputs.normalized, rx_node.inputs.data))
# Simplified (when unambiguous)
pipeline.connect((normalizer.normalized, rx_node.data))
# Use explicit syntax only when node has same-named input and output port
Port Definition¶
Input Ports¶
class ProcessingNode(Node):
INPUT_SPECS = {
"data": PortSpec(
dtype=torch.float32,
shape=(-1, -1, -1, -1),
description="Input hyperspectral cube",
optional=False
),
"mask": PortSpec(
dtype=torch.bool,
shape=(-1, -1, -1),
description="Optional binary mask",
optional=True
),
"weights": PortSpec(
dtype=torch.float32,
shape=("n_channels",), # Symbolic
description="Per-channel weights"
)
}
Output Ports¶
class AnalysisNode(Node):
OUTPUT_SPECS = {
"scores": PortSpec(
dtype=torch.float32,
shape=(-1,),
description="Anomaly scores per sample"
),
"embeddings": PortSpec(
dtype=torch.float32,
shape=(-1, 128),
description="Feature embeddings"
),
"metadata": PortSpec(
dtype=dict,
shape=(), # Scalar
description="Processing metadata"
)
}
Symbolic Dimension Resolution¶
from cuvis_ai.pipeline.ports import DimensionResolver
class SelectorNode(Node):
def __init__(self, n_select: int, **kwargs):
self.n_select = n_select # Store before super().__init__()
super().__init__(**kwargs)
OUTPUT_SPECS = {
"selected": PortSpec(
torch.float32,
(-1, -1, -1, "n_select") # Resolved at runtime
)
}
# Resolution
selector = SelectorNode(n_select=10)
resolved_shape = DimensionResolver.resolve(
shape=(-1, -1, -1, "n_select"),
node=selector
)
# Result: (-1, -1, -1, 10)
Variadic Ports (Fan-in)¶
Multiple connections to one port.
class AggregatorNode(Node):
INPUT_SPECS = {
"features": [PortSpec(
dtype=torch.float32,
shape=(-1, -1),
description="Feature tensors to aggregate"
)]
}
def forward(self, features: list[torch.Tensor], **_):
concatenated = torch.cat(features, dim=-1)
return {"aggregated": concatenated}
# Connect multiple sources
pipeline.connect(
(node1.feat1, aggregator.features),
(node2.feat2, aggregator.features),
(node3.feat3, aggregator.features)
)
Data Flow Patterns¶
Pattern 1: Linear (Single Input/Output)¶
graph LR
A[DataLoader] -->|data| B[Normalizer]
B -->|normalized| C[Analyzer]
C -->|scores| D[Output]
pipeline.connect(
(loader.data, normalizer.data),
(normalizer.normalized, analyzer.data),
(analyzer.scores, output_node.data)
)
Pattern 2: Fan-in (Multiple Sources)¶
graph LR
A[Source A] -->|features_a| C[Merger]
B[Source B] -->|features_b| C
C -->|merged| D[Downstream]
Pattern 3: Fan-out (Multiple Targets)¶
graph LR
A[Preprocessor] -->|data| B[Branch A]
A -->|data| C[Branch B]
A -->|data| D[Branch C]
pipeline.connect(
(preprocessor.data, branch_a.data),
(preprocessor.data, branch_b.data),
(preprocessor.data, branch_c.data)
)
Pattern 4: Multi-Port Node¶
graph LR
A[Analyzer] -->|scores| B[Decider]
A -->|embeddings| C[Visualizer]
A -->|metadata| D[Logger]
pipeline.connect(
(analyzer.scores, decider.predictions),
(analyzer.embeddings, visualizer.features),
(analyzer.metadata, logger.data)
)
Pattern 5: Complete Pipeline Example¶
This diagram shows a real anomaly detection pipeline with port-based data flow, including processing nodes, loss computation, and visualization:
graph LR
A[Input Batch] --> B[Port Distribution]
B --> C[MinMaxNormalizer.data]
C --> D[MinMaxNormalizer.normalized]
D --> E[SoftChannelSelector.data]
E --> F[SoftChannelSelector.selected]
F --> G[TrainablePCA.features]
G --> H[TrainablePCA.projected]
H --> I[RXGlobal.data]
I --> J[RXGlobal.scores]
E --> K[SelectorEntropyReg.weights]
F --> L[SelectorDiversityReg.weights]
J --> M[AnomalyBCEWithLogits.predictions]
J --> N[ScoreHeatmapVisualizer.data]
K --> O[SelectorEntropyReg.loss]
L --> P[SelectorDiversityReg.loss]
M --> Q[AnomalyBCEWithLogits.loss]
O --> R[GradientTrainer loss_nodes]
P --> R
Q --> R
N --> S[VisualizationManager.inputs]
R --> T[Backward Pass]
S --> U[Monitor Output]
pipeline.connect(
# Main processing chain
(data_node.cube, normalizer.data),
(normalizer.normalized, selector.data),
(selector.selected, pca.features),
(pca.projected, rx.data),
# Loss connections
(rx.scores, bce_loss.predictions),
(data_node.mask, bce_loss.targets),
(selector.weights, entropy_loss.weights),
(selector.weights, diversity_loss.weights),
# Visualization
(rx.scores, heatmap.scores),
)
Advanced Features¶
Optional Ports¶
class FlexibleNode(Node):
INPUT_SPECS = {
"data": PortSpec(..., optional=False),
"mask": PortSpec(..., optional=True),
}
def forward(self, data: torch.Tensor, mask: torch.Tensor | None = None, **_):
if mask is not None:
processed = data * mask.unsqueeze(-1)
else:
processed = data
return {"processed": processed}
# mask can be left unconnected
pipeline.connect((source.data, node.data))
Dynamic Port Resolution¶
class AdaptiveNode(Node):
def __init__(self, output_dim: int, **kwargs):
self.output_dim = output_dim
super().__init__(**kwargs)
INPUT_SPECS = {
"features": PortSpec(torch.float32, (-1, "input_dim"))
}
OUTPUT_SPECS = {
"transformed": PortSpec(torch.float32, (-1, "output_dim"))
}
@property
def input_dim(self):
return self.linear.in_features
Best Practices¶
| Practice | Guidance |
|---|---|
| Descriptive port names | anomaly_scores, feature_embeddings -- not out1, data |
| Explicit data types | dtype=torch.float32 -- not torch.Tensor (too generic) |
| Document port requirements | Use the description field with format, range, and channel info |
| Handle optional ports | Default None args to sensible values (e.g., all-ones mask) |
Troubleshooting
Type Mismatch (PortCompatibilityError: dtype mismatch) -- Use consistent dtypes:
Shape Mismatch (PortCompatibilityError: shape mismatch) -- Align node dimensions:
Missing Required Port -- Connect all required ports before running:
Symbolic Dimension Not Found (AttributeError) -- Store the attribute before super().__init__():