Status: Needs Review
This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.
Execution Stages¶
Control when nodes execute during pipeline operation via stage-aware graph execution.
Execution stages enable conditional node execution based on context (training, validation, testing, or inference). Essential for loss nodes (training only), metrics (validation/test only), and optimization-specific operations.
Key capabilities:
- Five execution stages: ALWAYS, TRAIN, VAL, TEST, INFERENCE
- Stage-aware filtering in pipeline executor
- Context object carries stage, epoch, batch_idx, global_step
- Default to ALWAYS for core processing nodes
- Restrict stages only when needed
Stage System Overview¶
Every node specifies execution stages via the execution_stages parameter:
from cuvis_ai_core.node import Node
from cuvis_ai_schemas.enums import ExecutionStage
class MyNode(Node):
def __init__(self, **kwargs):
super().__init__(
execution_stages={ExecutionStage.TRAIN, ExecutionStage.VAL},
**kwargs
)
Key Concepts:
| Concept | Description |
|---|---|
| ExecutionStage Enum | ALWAYS, TRAIN, VAL, TEST, INFERENCE |
| execution_stages Parameter | Set of stages when node should execute |
| Context Object | Runtime info (stage, epoch, batch_idx) passed to nodes |
| Stage Filtering | Pipeline skips nodes not matching current stage |
| Default | Nodes default to {ExecutionStage.ALWAYS} |
Execution Flow¶
graph LR
A[Pipeline.forward<br/>context.stage=TRAIN] --> B{For Each Node}
B --> C{Node.execution_stages<br/>contains stage?}
C -->|Yes| D[Execute Node]
C -->|No| E[Skip Node]
D --> F[Continue]
E --> F
style D fill:#d4edda
style E fill:#f8d7da
The Five Execution Stages¶
1. ALWAYS (Default)¶
Node executes unconditionally in all stages.
Use Cases:
- Core data transformation
- Feature extraction
- Essential preprocessing
class FeatureExtractor(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.ALWAYS}, **kwargs)
def forward(self, data, **_):
# Runs during TRAIN, VAL, TEST, INFERENCE
features = self.extract_features(data)
return {"features": features}
# Default behavior
node1 = MyNode() # Equivalent to ExecutionStage.ALWAYS
2. TRAIN¶
Node only executes during training. Use for: data augmentation, dropout, training loss.
class TrainingAugmentation(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.TRAIN}, **kwargs)
3. VAL (Validation)¶
Node only executes during validation. Use for: validation metrics, model selection criteria.
class ValidationMetrics(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.VAL}, **kwargs)
4. TEST¶
Node only executes during testing. Use for: final metrics, performance benchmarking.
class TestEvaluator(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.TEST}, **kwargs)
5. INFERENCE¶
Node only executes during inference/prediction. Use for: production post-processing, output formatting.
class InferencePostProcessor(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.INFERENCE}, **kwargs)
Stage-Aware Node Patterns¶
Pattern 1: Single Stage¶
class TrainOnlyAugmentation(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.TRAIN}, **kwargs)
Pattern 2: Multiple Stages¶
class MetricsNode(Node):
def __init__(self, **kwargs):
# Execute during validation and test only
super().__init__(
execution_stages={ExecutionStage.VAL, ExecutionStage.TEST},
**kwargs
)
Pattern 3: Training-Aware (Common for Loss)¶
from cuvis_ai.node.losses import LossNode
class MyLoss(LossNode):
# Auto-configured to {TRAIN, VAL, TEST}
def forward(self, predictions, targets, **_):
return {"loss": self.compute_loss(predictions, targets)}
Pattern 4: Stage-Conditional Behavior¶
class AdaptiveNormalizer(Node):
def __init__(self, **kwargs):
super().__init__(
execution_stages={ExecutionStage.TRAIN, ExecutionStage.INFERENCE},
**kwargs
)
def forward(self, data, context: Context, **_):
if context.stage == ExecutionStage.TRAIN:
self.update_running_stats(data)
normalized = self.normalize(data)
return {"normalized": normalized}
Pipeline Execution with Context¶
Context Object¶
Runtime information passed to pipeline.
from cuvis_ai_schemas.enums import ExecutionStage
from cuvis_ai_schemas.execution import Context
context = Context(
stage=ExecutionStage.TRAIN, # Current stage
epoch=5, # Training epoch
batch_idx=42, # Batch index
global_step=1337 # Global training step
)
Attributes:
| Attribute | Type | Description |
|---|---|---|
stage |
ExecutionStage |
Current stage |
epoch |
int |
Training epoch (0-indexed) |
batch_idx |
int |
Batch index in epoch (0-indexed) |
global_step |
int |
Global step counter |
Passing Context to Pipeline¶
from cuvis_ai_core.pipeline.pipeline import CuvisPipeline
from cuvis_ai_schemas.enums import ExecutionStage
from cuvis_ai_schemas.execution import Context
pipeline = CuvisPipeline()
# Training
train_context = Context(stage=ExecutionStage.TRAIN, epoch=0, batch_idx=0)
train_outputs = pipeline.forward(batch=train_batch, context=train_context)
# Validation
val_context = Context(stage=ExecutionStage.VAL, epoch=0, batch_idx=0)
val_outputs = pipeline.forward(batch=val_batch, context=val_context)
# Inference
inference_context = Context(stage=ExecutionStage.INFERENCE)
inference_outputs = pipeline.forward(batch=inference_batch, context=inference_context)
Data Flow Patterns¶
Training vs Inference Paths¶
graph TD
A[Data Input<br/>ALWAYS] --> B[Augmentation<br/>TRAIN]
A --> C[Feature Extraction<br/>ALWAYS]
B --> C
C --> D[Model<br/>ALWAYS]
D --> E[Loss<br/>TRAIN/VAL/TEST]
D --> F[Metrics<br/>VAL/TEST]
D --> G[Inference Optimizer<br/>INFERENCE]
style E fill:#d4edda
style F fill:#d4edda
style G fill:#f8d7da
Best Practices¶
- Use Semantic Stage Selection
# GOOD: Loss computes during training phases
class TrainingLoss(LossNode):
pass # Automatically {TRAIN, VAL, TEST}
# BAD: Loss executing during inference
class BadLoss(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.ALWAYS}, **kwargs)
- Default to ALWAYS for Core Nodes
# GOOD: Applies everywhere
class FeatureExtractor(Node):
pass # Defaults to ALWAYS
# BAD: Unnecessary restriction
class OverRestricted(Node):
def __init__(self, **kwargs):
super().__init__(
execution_stages={ExecutionStage.TRAIN, ExecutionStage.INFERENCE},
**kwargs
)
- Separate Concerns with Multiple Nodes
# GOOD: Separate nodes for different behaviors
class TrainingPostProcessor(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.TRAIN}, **kwargs)
class InferencePostProcessor(Node):
def __init__(self, **kwargs):
super().__init__(execution_stages={ExecutionStage.INFERENCE}, **kwargs)
-
Document Stage Decisions — Add docstrings explaining why a node is restricted to specific stages.
-
Test All Stages — Verify pipeline behavior in TRAIN, VAL, TEST, and INFERENCE stages.
Troubleshooting
Node Not Executing¶
Diagnosis:
logger.info(f"Node {node.name} stages: {node.execution_stages}")
logger.info(f"Current stage: {context.stage}")
Solution: Verify stage match
# Node restricted to TRAIN
node = MyNode(execution_stages={ExecutionStage.TRAIN})
# But running VAL - won't execute!
context = Context(stage=ExecutionStage.VAL)
# Fix: Add VAL to stages
node = MyNode(execution_stages={ExecutionStage.TRAIN, ExecutionStage.VAL})
Unexpected Node Execution¶
Solution: Check parent class constructor
class MyMetric(MetricNode): # Parent sets VAL/TEST
def __init__(self, **kwargs):
super().__init__(**kwargs) # Call parent to preserve stages
self.threshold = 0.5
Loss Not Computing in Validation¶
Solution: Use LossNode base class
from cuvis_ai.node.losses import LossNode
class MyLoss(LossNode):
# Auto-configured {TRAIN, VAL, TEST}
def forward(self, predictions, targets, **_):
return {"loss": self.compute_loss(predictions, targets)}
Context Not Available in Node¶
Fix: Add context to INPUT_SPECS
class MyNode(Node):
INPUT_SPECS = {
"data": PortSpec(dtype=torch.float32, shape=(-1, -1)),
"context": PortSpec(dtype=Context, shape=()), # Add this
}
def forward(self, data, context: Context, **_):
if context.stage == ExecutionStage.TRAIN:
return {"output": self.train_process(data)}
return {"output": self.eval_process(data)}
Pipeline automatically injects context for nodes declaring it in INPUT_SPECS.
Stage Enum Comparison Failing¶
Solution: Compare enum to enum, not string
Optimization Tips
-
Minimize Overhead in High-Frequency Stages
# BAD: Expensive visualization in training class BadVisualizer(Node): def __init__(self, **kwargs): super().__init__(execution_stages={ExecutionStage.ALWAYS}, **kwargs) def forward(self, data, **_): expensive_viz = self.render_3d_plot(data) # Runs every batch! return {"visualization": expensive_viz} # GOOD: Visualization only during validation class GoodVisualizer(Node): def __init__(self, **kwargs): super().__init__( execution_stages={ExecutionStage.VAL, ExecutionStage.TEST}, **kwargs ) -
Reduce Complexity in Inference
-
Skip Gradients in Non-Training