Node System Deep Dive¶
Fundamental processing units in CUVIS.AI pipelines.
A Node represents a single processing unit in a pipeline. Each node performs a specific task, declares typed input/output ports, manages internal state, and supports both CPU and GPU execution.
Key capabilities:
- Typed I/O via port specifications
- Optional statistical initialization from data
- Freeze/unfreeze for two-phase training
- Stage-aware execution control
- Serialization and restoration
Node Lifecycle¶
Complete lifecycle from creation to cleanup:
flowchart TB
A[Node Creation] --> B[Port Initialization]
B --> C{Requires Statistical<br/>Initialization?}
C -->|Yes| D[statistical_initialization<br/>from data]
C -->|No| E[Ready for Use]
D --> F{Convert to<br/>Trainable?}
F -->|Yes| G[unfreeze<br/>buffers → parameters]
F -->|No| H[Use Frozen<br/>Statistics]
G --> I[Gradient Training]
H --> E
I --> E
E --> J[Forward Pass<br/>Execution]
J --> K[Output Generation]
K --> L{More Data?}
L -->|Yes| J
L -->|No| M[Serialization<br/>save state_dict]
M --> N[Cleanup<br/>release resources]
style A fill:#e1f5ff
style D fill:#fff3cd
style G fill:#ffe66d
style J fill:#f3e5f5
style M fill:#d4edda
style N fill:#ffc107
Base Node Architecture¶
All nodes inherit from Node base class (itself inheriting from nn.Module, ABC, Serializable):
from cuvis_ai_core.node import Node
from cuvis_ai_schemas.pipeline import PortSpec
import torch
class Node(nn.Module, ABC, Serializable):
"""Base class for all nodes."""
# Class-level port specifications
INPUT_SPECS: dict[str, PortSpec | list[PortSpec]] = {}
OUTPUT_SPECS: dict[str, PortSpec | list[PortSpec]] = {}
def __init__(self, name: str | None = None, **kwargs):
super().__init__()
self.name = name
self._input_ports = {} # Created from INPUT_SPECS
self._output_ports = {} # Created from OUTPUT_SPECS
@abstractmethod
def forward(self, **inputs) -> dict[str, Any]:
"""Process inputs and return outputs."""
pass
Key properties:
requires_initial_fit: Auto-detects if node needs statistical initializationexecution_stages: Controls when node executes (TRAIN, VAL, TEST, INFERENCE, ALWAYS)freezed: Tracks frozen vs trainable state
Common Node Patterns¶
1. Data Loading Pattern¶
Load and validate input data
from cuvis_ai.node.data import LentilsAnomalyDataNode
data_node = LentilsAnomalyDataNode(normal_class_ids=[0, 1])
# Characteristics: Stateless, executes in all stages
2. Processing Pattern¶
Transform and normalize data
from cuvis_ai_core.pipeline.pipeline import CuvisPipeline
from cuvis_ai_core.training import StatisticalTrainer
from cuvis_ai.node.normalization import MinMaxNormalizer
from cuvis_ai.node.data import LentilsAnomalyDataNode
# Create pipeline and add nodes
pipeline = CuvisPipeline("Normalization_Pipeline")
data_node = LentilsAnomalyDataNode(normal_class_ids=[0, 1])
normalizer = MinMaxNormalizer(eps=1e-6, use_running_stats=True)
# Connect nodes
pipeline.connect(
(data_node.outputs.cube, normalizer.data),
)
# Statistical initialization via trainer
trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
trainer.fit() # Automatically initializes normalizer with statistics
# Characteristics: Can be stateless or stateful
3. Statistical Pattern¶
Anomaly detection using statistical methods
from cuvis_ai_core.pipeline.pipeline import CuvisPipeline
from cuvis_ai_core.training import StatisticalTrainer
from cuvis_ai.anomaly.rx_detector import RXGlobal
from cuvis_ai.node.normalization import MinMaxNormalizer
from cuvis_ai.node.data import LentilsAnomalyDataNode
# Create pipeline with statistical nodes
pipeline = CuvisPipeline("RX_Statistical")
data_node = LentilsAnomalyDataNode(normal_class_ids=[0, 1])
normalizer = MinMaxNormalizer(eps=1e-6, use_running_stats=True)
rx_node = RXGlobal(num_channels=61, eps=1e-6)
# Connect the pipeline
pipeline.connect(
(data_node.outputs.cube, normalizer.data),
(normalizer.normalized, rx_node.data),
)
# Phase 1: Statistical initialization
trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
trainer.fit() # Initializes all statistical nodes (normalizer, rx_node)
# Phase 2 (optional): Enable gradient training
pipeline.unfreeze_nodes_by_name([rx_node.name])
Statistical initialization pattern:
def statistical_initialization(self, input_stream: InputStream) -> None:
"""Compute statistics from initialization data."""
self.reset()
for batch_data in input_stream:
data_tensor = batch_data["data"]
self._update_statistics(data_tensor) # Welford's algorithm
# Store as buffers (frozen by default)
self.register_buffer("mu", computed_mean)
self.register_buffer("sigma", computed_covariance)
self._statistically_initialized = True
Characteristics: Requires initialization, stores statistics as buffers, can be unfrozen for training
4. Deep Learning Pattern¶
Neural network-based analysis
from cuvis_ai_core.pipeline.pipeline import CuvisPipeline
from cuvis_ai_core.training import StatisticalTrainer, GradientTrainer
from cuvis_ai.anomaly.deep_svdd import (
DeepSVDDProjection,
DeepSVDDCenterTracker,
DeepSVDDScores,
ZScoreNormalizerGlobal,
)
from cuvis_ai.node.losses import DeepSVDDSoftBoundaryLoss
from cuvis_ai.node.data import LentilsAnomalyDataNode
# Create pipeline with deep learning nodes
pipeline = CuvisPipeline("DeepSVDD")
data_node = LentilsAnomalyDataNode(normal_class_ids=[0, 1])
encoder = ZScoreNormalizerGlobal(num_channels=61, hidden=32, sample_n=100)
projection = DeepSVDDProjection(in_channels=61, rep_dim=16, hidden=[32, 16])
center_tracker = DeepSVDDCenterTracker(rep_dim=16)
loss_node = DeepSVDDSoftBoundaryLoss(name="deepsvdd_loss")
# Connect the pipeline
pipeline.connect(
(data_node.outputs.cube, encoder.data),
(encoder.normalized, projection.data),
(projection.embeddings, center_tracker.embeddings),
(projection.embeddings, loss_node.embeddings),
(center_tracker.center, loss_node.center),
)
# Phase 1: Statistical initialization of encoder
stat_trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
stat_trainer.fit()
# Phase 2: Gradient training
pipeline.unfreeze_nodes_by_name([encoder.name])
grad_trainer = GradientTrainer(
pipeline=pipeline,
datamodule=datamodule,
loss_nodes=[loss_node],
trainer_config=training_config,
)
grad_trainer.fit()
Characteristics: Trainable neural networks, GPU-accelerated, require gradient optimization
Creating Custom Nodes¶
Basic Custom Node Template¶
from cuvis_ai_core.node import Node
from cuvis_ai_schemas.pipeline import PortSpec
import torch
from torch import nn
class MyCustomNode(Node):
"""Custom node for specific processing."""
INPUT_SPECS = {
"features": PortSpec(
dtype=torch.float32,
shape=(-1, -1),
description="Input feature vectors"
)
}
OUTPUT_SPECS = {
"transformed": PortSpec(
dtype=torch.float32,
shape=(-1, -1),
description="Transformed features"
)
}
def __init__(self, input_dim: int, output_dim: int, **kwargs):
self.input_dim = input_dim
self.output_dim = output_dim
super().__init__(input_dim=input_dim, output_dim=output_dim, **kwargs)
self.linear = nn.Linear(input_dim, output_dim)
def forward(self, features: torch.Tensor, **_) -> dict[str, torch.Tensor]:
"""Process input features."""
transformed = self.linear(features)
return {"transformed": transformed}
Adding Statistical Initialization (Optional)¶
def statistical_initialization(self, input_stream: InputStream) -> None:
"""Initialize parameters from data."""
feature_sum = torch.zeros(self.input_dim)
count = 0
for batch_data in input_stream:
features = batch_data["features"]
feature_sum += features.sum(dim=0)
count += features.shape[0]
mean = feature_sum / count
self.register_buffer("running_mean", mean)
self._statistically_initialized = True
def unfreeze(self) -> None:
"""Convert buffers to parameters for gradient training."""
if hasattr(self, "running_mean"):
self.running_mean = nn.Parameter(self.running_mean.clone())
super().unfreeze()
Node Registration¶
from cuvis_ai_core.utils.node_registry import NodeRegistry
@NodeRegistry.register
class MyCustomNode(Node):
"""Now discoverable via NodeRegistry.get("MyCustomNode")"""
pass
Node Registry and Discovery¶
Built-in node access:
from cuvis_ai_core.utils.node_registry import NodeRegistry
# Get node class
RXGlobal = NodeRegistry.get("RXGlobal")
# List all nodes
all_nodes = NodeRegistry.list_builtin_nodes()
Plugin support:
# Create registry instance
registry = NodeRegistry()
registry.load_plugins("path/to/plugins.yaml")
# Get plugin node
AdaCLIPDetector = registry.get("AdaCLIPDetector")
# Use with PipelineBuilder
from cuvis_ai.pipeline.pipeline_builder import PipelineBuilder
builder = PipelineBuilder(node_registry=registry)
Plugin configuration (plugins.yaml):
plugins:
adaclip:
repo: "https://github.com/cubert-hyperspectral/cuvis-ai-adaclip.git"
tag: "v0.1.0"
provides:
- cuvis_ai_adaclip.node.adaclip_node.AdaCLIPDetector
Resolution order: Instance plugins → Built-in registry → Import from module path
State Management¶
Buffers vs Parameters¶
Parameters are trainable (receive gradients). Buffers are non-trainable state. Both are serialized and moved with .to(device).
# Buffers: statistics, running means, constants
self.register_buffer("mean", torch.zeros(num_features))
self.register_buffer("covariance", torch.eye(num_features))
# Parameters: weights, learnable transformations
self.linear = nn.Linear(in_features, out_features)
self.register_parameter("custom_weight", nn.Parameter(torch.randn(10)))
# Two-phase: buffer → parameter
self.register_buffer("scale", torch.ones(1)) # Phase 1: statistical init
self.scale = nn.Parameter(self.scale.clone()) # Phase 2: unfreeze for training
Freeze/Unfreeze Pattern¶
from cuvis_ai_core.training import StatisticalTrainer
# Phase 1: Statistical initialization (frozen by default)
trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
trainer.fit() # Computes statistics, stores as buffers
node.freezed # True - no gradients, values constant
# Phase 2: Unfreeze for gradient training (optional)
node.unfreeze() # Converts buffers → parameters, enables gradient updates
node.freezed # False - now trainable
# Benefits: Fast statistical init + optional gradient refinement
Best Practices¶
1. Keep Nodes Focused¶
Single responsibility - one node, one task. Compose complex behavior from simple nodes.
class NormalizationNode(Node):
"""Normalizes input data to [0, 1] range."""
pass
class AnomalyDetectionNode(Node):
"""Detects anomalies using RX algorithm."""
pass
2. Trust Port Validation (Don't Over-Validate)¶
Port schema validation is automatic - the pipeline validates data types and shapes. Do NOT duplicate these checks:
# ❌ BAD: Duplicates automatic port validation
def forward(self, data: torch.Tensor, **_):
if data.ndim != 4: # Port spec already validates this
raise ValueError(f"Expected 4D tensor (BHWC), got {data.shape}")
if data.dtype != torch.float32: # Port spec already validates this
raise TypeError(f"Expected float32, got {data.dtype}")
return {"output": self.process(data)}
# ✅ GOOD: Trust port specs, only check node-specific state
INPUT_SPECS = {
"data": PortSpec(
dtype=torch.float32,
shape=(-1, -1, -1, 61), # Framework validates this
description="Hyperspectral cube in BHWC format"
)
}
def forward(self, data: torch.Tensor, **_):
# DO check node-specific initialization state (not automatic)
if not self._statistically_initialized:
raise RuntimeError(f"{self.__class__.__name__} requires initialization")
return {"output": self.process(data)}
What's automatic: Port shape/dtype validation What's manual: Statistical initialization checks (node responsibility)
3. Avoid .to() in Forward (Pipeline Handles Device Placement)¶
The pipeline automatically moves nodes, parameters, and data to the correct device when pipeline.to(device) is called. Do NOT use .to() calls in forward():
# ❌ BAD: Manual device placement breaks multi-device training
def forward(self, data: torch.Tensor, **_):
data = data.to("cuda") # DON'T DO THIS!
weights = self.weights.to("cuda") # DON'T DO THIS!
result = torch.matmul(data, weights)
return {"output": result}
# ✅ GOOD: Let the pipeline handle device placement
def forward(self, data: torch.Tensor, **_):
# data and self.weights are already on the correct device
result = torch.matmul(data, self.weights)
return {"output": result}
Why avoid .to() in forward:
- Breaks multi-device training (model parallelism)
- Unnecessary overhead (data already on correct device)
- Pipeline manages device placement via
pipeline.to(device) - The framework ensures consistency across all nodes
4. Document Port Requirements¶
INPUT_SPECS = {
"data": PortSpec(
dtype=torch.float32,
shape=(-1, -1, -1, 61), # BHWC with 61 channels
description="Hyperspectral cube in BHWC format, normalized to [0, 1]"
)
}
5. Use Context for Training Metadata¶
Context provides training metadata passed as a parameter to forward(). Contains stage, epoch, batch_idx, and global_step:
from cuvis_ai_schemas.execution import Context, Metric
from cuvis_ai_schemas.enums import ExecutionStage
# ✅ GOOD: Context passed as parameter
def forward(self, predictions: torch.Tensor, targets: torch.Tensor,
context: Context) -> dict:
# Context fields
stage = context.stage # ExecutionStage enum: "train", "val", "test", "inference"
epoch = context.epoch # Current epoch (0-indexed)
batch_idx = context.batch_idx # Current batch in epoch (0-indexed)
step = context.global_step # Global step across all epochs (for logging)
# Use for metrics
loss_value = self.compute_loss(predictions, targets)
metric = Metric(
name="loss",
value=loss_value,
stage=stage,
epoch=epoch,
batch_idx=batch_idx,
)
# Conditional behavior per stage
if context.stage == ExecutionStage.TRAIN:
self.update_running_stats(predictions)
return {"metrics": [metric]}
# ❌ BAD: Manual metadata parameters
def forward(self, predictions: torch.Tensor, targets: torch.Tensor,
epoch: int, batch_idx: int, stage: str, **_): # Don't do this
pass
Context fields:
stage: ExecutionStage enum ("train", "val", "test", "inference")epoch: Current epoch number (0-indexed)batch_idx: Batch index within epoch (0-indexed)global_step: Global step counter for monitoring (0-indexed)
Common use cases:
- Metric/artifact logging with training metadata
- Conditional behavior per stage (train vs inference)
- TensorBoard step tracking via
global_step
6. Initialize Buffers and Parameters Properly¶
All buffers and parameters MUST be fully initialized in __init__ with correct dimensions. Initializing them as None or empty and rewriting later breaks serialization.
Get all required arguments (like num_channels) upfront in the constructor:
# ❌ BAD: Deferred initialization breaks serialization
class BadNode(Node):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.register_buffer("mu", None) # Breaks state_dict!
self.register_buffer("cov", torch.zeros(0))
def statistical_initialization(self, input_stream: InputStream) -> None:
num_channels = next(iter(input_stream))["data"].shape[-1]
self.mu = torch.zeros(num_channels) # Too late - breaks serialization
# ✅ GOOD: Proper initialization with required arguments
class GoodNode(Node):
def __init__(self, num_channels: int, **kwargs):
super().__init__(num_channels=num_channels, **kwargs)
# Buffers initialized with correct shapes immediately
self.register_buffer("mu", torch.zeros(num_channels, dtype=torch.float32))
self.register_buffer("cov", torch.eye(num_channels, dtype=torch.float32))
def statistical_initialization(self, input_stream: InputStream) -> None:
for batch in input_stream:
self._update_statistics(batch["data"])
# Update in-place (maintains buffer identity)
self.mu.copy_(computed_mean)
self.cov.copy_(computed_covariance)
Why: PyTorch's state_dict() captures buffers at register_buffer() time. Reassigning later breaks serialization completely.
Troubleshooting¶
"Node not initialized" Error¶
Problem: RuntimeError when using statistical nodes without initialization
Solution: Use StatisticalTrainer to initialize nodes before use
from cuvis_ai_core.training import StatisticalTrainer
rx = RXGlobal(num_channels=61)
pipeline.add_node(rx)
# Initialize using trainer
trainer = StatisticalTrainer(pipeline=pipeline, datamodule=datamodule)
trainer.fit() # Initializes rx node
# Now ready for inference
outputs = rx.forward(data=test_data)
Port Type Mismatch¶
Problem: PortCompatibilityError with dtype mismatch
Solution: Ensure consistent dtypes across pipeline
normalizer = MinMaxNormalizer(dtype=torch.float32)
rx = RXGlobal(num_channels=61, dtype=torch.float32) # Match dtype
Shape Mismatch¶
Problem: ValueError with unexpected input shape
Solution: Check channel dimension matches node expectations
Related Documentation¶
- → Port System Deep Dive - Node I/O and data flow
- → Pipeline Lifecycle - Node integration in pipelines
- → Two-Phase Training - Statistical initialization and gradient training
- → Node Catalog - Built-in node reference
- → Creating Custom Nodes - Step-by-step tutorial