Status: Needs Review
This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.
Port System Deep Dive¶
Typed, validated connections between nodes enable robust data flow in CUVIS.AI pipelines.
Ports provide explicit communication interfaces with type safety, shape validation, optional connections, and symbolic dimension resolution.
Key capabilities:
- Typed I/O with dtype and shape specifications
- Automatic connection compatibility validation
- Support for optional and variadic ports
- Symbolic dimensions resolved from node attributes
- Data flow patterns (fan-in, fan-out, multi-port)
Port Architecture¶
PortSpec Definition¶
Dataclass specifying the contract for a port.
from dataclasses import dataclass
@dataclass
class PortSpec:
dtype: Any # Data type (torch.float32, etc.)
shape: tuple[int | str, ...] # Shape with -1 for flexible, strings for symbolic
description: str = "" # Documentation
optional: bool = False # Connection required?
Port Types¶
InputPort: Receives data into a node
node.inputs.data # InputPort(node, "data", spec)
pipeline.connect(source_port, target_node.inputs.data)
OutputPort: Emits data from a node
node.outputs.scores # OutputPort(node, "scores", spec)
pipeline.connect(source_node.outputs.scores, target_port)
Shape Specifications¶
Three dimension types:
Flexible (-1): Accept any size
Fixed (integers): Require exact size
Symbolic (strings): Resolve from node attributes
Connection Validation¶
flowchart LR
A[Source Port] -->|dtype check| B{Compatible?}
B -->|No| C[PortCompatibilityError]
B -->|Yes| D[Resolve symbolic dims]
D -->|shape check| E{Compatible?}
E -->|No| C
E -->|Yes| F[Connection Valid]
style A fill:#e1f5ff
style F fill:#d4edda
style C fill:#f8d7da
Compatibility Rules:
| Source | Target | Compatible? |
|---|---|---|
torch.float32 |
torch.float32 |
✅ Yes |
torch.float32 |
torch.float64 |
❌ No |
torch.Tensor |
torch.float32 |
✅ Yes |
(-1, -1, 61) |
(-1, -1, 61) |
✅ Yes |
(-1, -1, 61) |
(-1, -1, -1) |
✅ Yes (flexible target) |
(-1, -1, 61) |
(-1, -1, 30) |
❌ No (fixed mismatch) |
Connection API¶
# Single connection
pipeline.connect((source_node.result, target_node.data))
# Batch connections
pipeline.connect(
(node1.data, node2.data),
(node2.result, node3.features),
(node3.scores, node4.predictions)
)
# Nodes auto-added if not present
pipeline.connect((new_node1.data, new_node2.data))
Simplified Syntax¶
# Explicit (always works)
pipeline.connect((normalizer.outputs.normalized, rx_node.inputs.data))
# Simplified (when unambiguous)
pipeline.connect((normalizer.normalized, rx_node.data))
# Use explicit syntax only when node has same-named input and output port
Port Definition¶
Input Ports¶
class ProcessingNode(Node):
INPUT_SPECS = {
"data": PortSpec(
dtype=torch.float32,
shape=(-1, -1, -1, -1),
description="Input hyperspectral cube",
optional=False
),
"mask": PortSpec(
dtype=torch.bool,
shape=(-1, -1, -1),
description="Optional binary mask",
optional=True
),
"weights": PortSpec(
dtype=torch.float32,
shape=("n_channels",), # Symbolic
description="Per-channel weights"
)
}
Output Ports¶
class AnalysisNode(Node):
OUTPUT_SPECS = {
"scores": PortSpec(
dtype=torch.float32,
shape=(-1,),
description="Anomaly scores per sample"
),
"embeddings": PortSpec(
dtype=torch.float32,
shape=(-1, 128),
description="Feature embeddings"
),
"metadata": PortSpec(
dtype=dict,
shape=(), # Scalar
description="Processing metadata"
)
}
Symbolic Dimension Resolution¶
from cuvis_ai.pipeline.ports import DimensionResolver
class SelectorNode(Node):
def __init__(self, n_select: int, **kwargs):
self.n_select = n_select # Store before super().__init__()
super().__init__(**kwargs)
OUTPUT_SPECS = {
"selected": PortSpec(
torch.float32,
(-1, -1, -1, "n_select") # Resolved at runtime
)
}
# Resolution
selector = SelectorNode(n_select=10)
resolved_shape = DimensionResolver.resolve(
shape=(-1, -1, -1, "n_select"),
node=selector
)
# Result: (-1, -1, -1, 10)
Variadic Ports (Fan-in)¶
Multiple connections to one port.
class AggregatorNode(Node):
INPUT_SPECS = {
"features": [PortSpec(
dtype=torch.float32,
shape=(-1, -1),
description="Feature tensors to aggregate"
)]
}
def forward(self, features: list[torch.Tensor], **_):
concatenated = torch.cat(features, dim=-1)
return {"aggregated": concatenated}
# Connect multiple sources
pipeline.connect(
(node1.feat1, aggregator.features),
(node2.feat2, aggregator.features),
(node3.feat3, aggregator.features)
)
Data Flow Patterns¶
Pattern 1: Linear (Single Input/Output)¶
graph LR
A[DataLoader] -->|data| B[Normalizer]
B -->|normalized| C[Analyzer]
C -->|scores| D[Output]
style A fill:#e1f5ff
style D fill:#d4edda
pipeline.connect(
(loader.data, normalizer.data),
(normalizer.normalized, analyzer.data),
(analyzer.scores, output_node.data)
)
Pattern 2: Fan-in (Multiple Sources)¶
graph LR
A[Source A] -->|features_a| C[Merger]
B[Source B] -->|features_b| C
C -->|merged| D[Downstream]
style A fill:#e1f5ff
style B fill:#e1f5ff
style D fill:#d4edda
Pattern 3: Fan-out (Multiple Targets)¶
graph LR
A[Preprocessor] -->|data| B[Branch A]
A -->|data| C[Branch B]
A -->|data| D[Branch C]
style A fill:#e1f5ff
style B fill:#d4edda
style C fill:#d4edda
style D fill:#d4edda
pipeline.connect(
(preprocessor.data, branch_a.data),
(preprocessor.data, branch_b.data),
(preprocessor.data, branch_c.data)
)
Pattern 4: Multi-Port Node¶
graph LR
A[Analyzer] -->|scores| B[Decider]
A -->|embeddings| C[Visualizer]
A -->|metadata| D[Logger]
style A fill:#f3e5f5
style B fill:#d4edda
style C fill:#d4edda
style D fill:#d4edda
pipeline.connect(
(analyzer.scores, decider.predictions),
(analyzer.embeddings, visualizer.features),
(analyzer.metadata, logger.data)
)
Pattern 5: Complete Pipeline Example¶
This diagram shows a real anomaly detection pipeline with port-based data flow, including processing nodes, loss computation, and visualization:
graph LR
A[Input Batch] --> B[Port Distribution]
B --> C[MinMaxNormalizer.data]
C --> D[MinMaxNormalizer.normalized]
D --> E[SoftChannelSelector.data]
E --> F[SoftChannelSelector.selected]
F --> G[TrainablePCA.features]
G --> H[TrainablePCA.projected]
H --> I[RXGlobal.data]
I --> J[RXGlobal.scores]
E --> K[SelectorEntropyReg.weights]
F --> L[SelectorDiversityReg.weights]
J --> M[AnomalyBCEWithLogits.predictions]
J --> N[AnomalyHeatmap.data]
K --> O[SelectorEntropyReg.loss]
L --> P[SelectorDiversityReg.loss]
M --> Q[AnomalyBCEWithLogits.loss]
O --> R[GradientTrainer loss_nodes]
P --> R
Q --> R
N --> S[VisualizationManager.inputs]
R --> T[Backward Pass]
S --> U[Monitor Output]
style A fill:#e1f5ff
style T fill:#ffc107
style U fill:#2196f3
Data Flow Explanation:
- Main Processing Chain: Input → MinMaxNormalizer → SoftChannelSelector → TrainablePCA → RXGlobal → Anomaly Scores
- Regularization Branch: Selector weights feed into entropy and diversity regularizers
- Loss Aggregation: Three loss nodes (BCE, entropy, diversity) are registered with GradientTrainer
- Visualization Branch: Scores and heatmaps feed into monitoring system
Code Example:
from cuvis_ai.node.normalization import MinMaxNormalizer
from cuvis_ai.node.selector import SoftChannelSelector
from cuvis_ai.node.pca import TrainablePCA
from cuvis_ai.anomaly.rx_detector import RXGlobal
from cuvis_ai.node.losses import AnomalyBCEWithLogits, SelectorEntropyRegularizer, SelectorDiversityRegularizer
from cuvis_ai.node.visualizations import AnomalyHeatmap
# Create nodes
normalizer = MinMaxNormalizer(eps=1e-6)
selector = SoftChannelSelector(n_select=15, input_channels=61)
pca = TrainablePCA(n_components=10, input_dim=15)
rx = RXGlobal(num_channels=10)
bce_loss = AnomalyBCEWithLogits(name="bce", weight=10.0)
entropy_loss = SelectorEntropyRegularizer(name="entropy", weight=0.1)
diversity_loss = SelectorDiversityRegularizer(name="diversity", weight=0.01)
heatmap = AnomalyHeatmap(name="heatmap")
# Connect pipeline
pipeline.connect(
# Main processing chain
(data_node.cube, normalizer.data),
(normalizer.normalized, selector.data),
(selector.selected, pca.features),
(pca.projected, rx.data),
# Loss connections
(rx.scores, bce_loss.predictions),
(data_node.mask, bce_loss.targets),
(selector.weights, entropy_loss.weights),
(selector.weights, diversity_loss.weights),
# Visualization
(rx.scores, heatmap.scores),
)
# Register loss nodes with trainer
grad_trainer = GradientTrainer(
pipeline=pipeline,
datamodule=datamodule,
loss_nodes=[bce_loss, entropy_loss, diversity_loss],
...
)
Advanced Features¶
Optional Ports¶
class FlexibleNode(Node):
INPUT_SPECS = {
"data": PortSpec(..., optional=False),
"mask": PortSpec(..., optional=True),
}
def forward(self, data: torch.Tensor, mask: torch.Tensor | None = None, **_):
if mask is not None:
processed = data * mask.unsqueeze(-1)
else:
processed = data
return {"processed": processed}
# mask can be left unconnected
pipeline.connect((source.data, node.data))
Dynamic Port Resolution¶
class AdaptiveNode(Node):
def __init__(self, output_dim: int, **kwargs):
self.output_dim = output_dim
super().__init__(**kwargs)
INPUT_SPECS = {
"features": PortSpec(torch.float32, (-1, "input_dim"))
}
OUTPUT_SPECS = {
"transformed": PortSpec(torch.float32, (-1, "output_dim"))
}
@property
def input_dim(self):
return self.linear.in_features
Best Practices¶
- Descriptive Port Names
OUTPUT_SPECS = {
"anomaly_scores": PortSpec(...),
"feature_embeddings": PortSpec(...),
"class_probabilities": PortSpec(...)
}
- Explicit Data Types
- Document Port Requirements
INPUT_SPECS = {
"hyperspectral_cube": PortSpec(
dtype=torch.float32,
shape=(-1, -1, -1, 61),
description="""
Format: BHWC (Batch, Height, Width, Channels)
Channels: 61 spectral bands (400-1000nm)
Normalization: Values in [0, 1]
"""
)
}
- Validate in forward()
def forward(self, data: torch.Tensor, **_):
if data.ndim != 4:
raise ValueError(f"Expected 4D tensor (BHWC), got {data.ndim}D")
if data.shape[-1] != self.expected_channels:
raise ValueError(f"Expected {self.expected_channels} channels, got {data.shape[-1]}")
if data.min() < 0 or data.max() > 1:
raise ValueError("Data must be in [0, 1] range")
- Handle Optional Ports Gracefully
def forward(
self,
data: torch.Tensor,
mask: torch.Tensor | None = None,
weights: torch.Tensor | None = None,
**_
):
if mask is None:
mask = torch.ones_like(data[..., 0], dtype=torch.bool)
if weights is None:
weights = torch.ones(data.shape[-1], device=data.device)
return self._process(data, mask, weights)
Troubleshooting¶
Type Mismatch¶
PortCompatibilityError: Cannot connect ports: dtype mismatch
Source: normalizer (torch.float32)
Target: analyzer (torch.float64)
Fix: Use consistent dtypes
Shape Mismatch¶
PortCompatibilityError: Cannot connect ports: shape mismatch
Source: selector (shape: (-1, -1, -1, 10))
Target: rx_node (shape: (-1, -1, -1, 61))
Fix: Update node configuration
Missing Required Port¶
Fix: Connect all required ports
Symbolic Dimension Not Found¶
Fix: Store attribute before super().init()
class SelectorNode(Node):
def __init__(self, n_channels: int, **kwargs):
self.n_channels = n_channels # MUST be before super()
super().__init__(**kwargs)
OUTPUT_SPECS = {
"selected": PortSpec(torch.float32, (-1, -1, -1, "n_channels"))
}
Related Documentation¶
- → Node System Deep Dive - Node I/O and port usage
- → Pipeline Lifecycle - Node integration in pipelines
- → Two-Phase Training - Statistical initialization
- → Building Pipelines (Python) - Practical construction
- → Building Pipelines (YAML) - Configuration-based construction