Status: Needs Review
This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.
Client Workflows & Error Handling¶
Configuration, training, inference, and error handling patterns for CUVIS.AI gRPC clients.
Configuration Patterns¶
Standard Config Resolution¶
from cuvis_ai.utils.grpc_workflow import (
resolve_trainrun_config,
apply_trainrun_config,
)
# Resolve trainrun config with overrides
resolved, config_dict = resolve_trainrun_config(
stub,
session_id,
"deep_svdd",
overrides=[
"training.trainer.max_epochs=50",
"training.optimizer.lr=0.0005",
"data.batch_size=8",
],
)
# Apply to session
apply_trainrun_config(stub, session_id, resolved.config_bytes)
# Inspect resolved config
print(f"Pipeline: {config_dict['pipeline']['name']}")
print(f"Optimizer: {config_dict['training']['optimizer']['name']}")
print(f"Learning rate: {config_dict['training']['optimizer']['lr']}")
Config Validation Before Training¶
import json
def validate_and_train(stub, session_id, trainrun_name, overrides=None):
"""Resolve, validate, and train with error handling."""
from cuvis_ai.utils.grpc_workflow import resolve_trainrun_config
# Resolve config
resolved, config_dict = resolve_trainrun_config(
stub, session_id, trainrun_name, overrides
)
# Validate training config
training_json = json.dumps(config_dict["training"]).encode("utf-8")
validation = stub.ValidateConfig(
cuvis_ai_pb2.ValidateConfigRequest(
config_type="training",
config_bytes=training_json,
)
)
if not validation.valid:
print("Training configuration validation failed:")
for error in validation.errors:
print(f" ERROR: {error}")
raise ValueError("Invalid training configuration")
# Show warnings
for warning in validation.warnings:
print(f" WARNING: {warning}")
# Apply config and train
stub.SetTrainRunConfig(
cuvis_ai_pb2.SetTrainRunConfigRequest(
session_id=session_id,
config=cuvis_ai_pb2.TrainRunConfig(config_bytes=resolved.config_bytes),
)
)
# Train with validated config
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
yield progress
# Usage
for progress in validate_and_train(stub, session_id, "deep_svdd"):
print(format_progress(progress))
Training Patterns¶
Two-Phase Training Pattern¶
from cuvis_ai.utils.grpc_workflow import format_progress
def two_phase_training(stub, session_id):
"""Standard two-phase training: statistical then gradient."""
# Phase 1: Statistical initialization (fast, no backprop)
print("=== Phase 1: Statistical Training ===")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
)
):
print(f"[Statistical] {format_progress(progress)}")
# Phase 2: Gradient-based fine-tuning (full training)
print("\n=== Phase 2: Gradient Training ===")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(f"[Gradient] {format_progress(progress)}")
print("\n=== Training Complete ===")
# Usage
two_phase_training(stub, session_id)
Progress Monitoring with Early Stopping¶
def train_with_early_stopping(stub, session_id, trainer_type, patience=5):
"""Train with custom early stopping logic."""
best_metric = float('-inf')
patience_counter = 0
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=trainer_type,
)
):
# Print all progress
print(format_progress(progress))
# Check validation metrics for early stopping
if progress.context.stage == cuvis_ai_pb2.EXECUTION_STAGE_VAL:
current_metric = progress.metrics.get('val_iou', 0.0)
if current_metric > best_metric:
best_metric = current_metric
patience_counter = 0
print(f" → New best metric: {best_metric:.4f}")
else:
patience_counter += 1
print(f" → No improvement (patience: {patience_counter}/{patience})")
if patience_counter >= patience:
print(f"\n=== Early stopping triggered (patience={patience}) ===")
break
print(f"\nBest validation metric: {best_metric:.4f}")
# Usage
train_with_early_stopping(stub, session_id, cuvis_ai_pb2.TRAINER_TYPE_GRADIENT)
Checkpoint Saving Strategy¶
def train_with_checkpoints(stub, session_id, trainer_type, checkpoint_every_n_epochs=10):
"""Save checkpoints periodically during training."""
from pathlib import Path
output_dir = Path("outputs/checkpoints")
output_dir.mkdir(parents=True, exist_ok=True)
epoch_counter = 0
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=trainer_type,
)
):
print(format_progress(progress))
# Save checkpoint at end of each N epochs
if progress.context.stage == cuvis_ai_pb2.EXECUTION_STAGE_TRAIN:
current_epoch = progress.context.epoch
if current_epoch > epoch_counter:
epoch_counter = current_epoch
if epoch_counter % checkpoint_every_n_epochs == 0:
checkpoint_path = output_dir / f"checkpoint_epoch_{epoch_counter}.yaml"
print(f"\n → Saving checkpoint at epoch {epoch_counter}...")
response = stub.SavePipeline(
cuvis_ai_pb2.SavePipelineRequest(
session_id=session_id,
pipeline_path=str(checkpoint_path),
)
)
print(f" → Saved: {response.pipeline_path}\n")
# Usage
train_with_checkpoints(stub, session_id, cuvis_ai_pb2.TRAINER_TYPE_GRADIENT)
Inference Patterns¶
Single Inference Pattern¶
from cuvis_ai_core.grpc import helpers
import numpy as np
def run_inference(stub, session_id, cube, wavelengths):
"""Run inference on single hyperspectral cube."""
# Run inference
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.numpy_to_proto(cube),
wavelengths=helpers.numpy_to_proto(wavelengths),
),
)
)
# Convert outputs
outputs = {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in response.outputs.items()
}
return outputs
# Usage
cube = np.random.rand(1, 32, 32, 61).astype(np.float32)
wavelengths = np.linspace(430, 910, 61).reshape(1, -1).astype(np.float32)
outputs = run_inference(stub, session_id, cube, wavelengths)
print(f"Outputs: {list(outputs.keys())}")
Batch Inference Pattern¶
from torch.utils.data import DataLoader
from cuvis_ai_core.data.datasets import SingleCu3sDataModule
def batch_inference(stub, session_id, cu3s_path, batch_size=4):
"""Efficient batch inference on CU3S dataset."""
from cuvis_ai_core.grpc import helpers
# Create data loader
datamodule = SingleCu3sDataModule(
cu3s_file_path=cu3s_path,
batch_size=batch_size,
processing_mode="Reflectance",
)
datamodule.setup(stage="test")
dataloader = datamodule.test_dataloader()
# Run inference on each batch
all_results = []
for batch_idx, batch in enumerate(dataloader):
print(f"Processing batch {batch_idx + 1}/{len(dataloader)}")
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.tensor_to_proto(batch["cube"]),
wavelengths=helpers.tensor_to_proto(batch["wavelengths"]),
),
)
)
# Convert and store outputs
batch_outputs = {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in response.outputs.items()
}
all_results.append(batch_outputs)
return all_results
# Usage
results = batch_inference(stub, session_id, "data/Lentils_000.cu3s", batch_size=4)
print(f"Processed {len(results)} batches")
Output Filtering Pattern¶
def filtered_inference(stub, session_id, cube, wavelengths, output_specs):
"""Inference with output filtering (reduces payload)."""
from cuvis_ai_core.grpc import helpers
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.numpy_to_proto(cube),
wavelengths=helpers.numpy_to_proto(wavelengths),
),
output_specs=output_specs, # Only request specific outputs
)
)
outputs = {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in response.outputs.items()
}
return outputs
# Usage - only request final decisions
outputs = filtered_inference(
stub,
session_id,
cube,
wavelengths,
output_specs=["decider.decisions", "detector.scores"],
)
Error Handling¶
Comprehensive Error Handling¶
import grpc
def safe_inference(stub, session_id, cube, wavelengths):
"""Inference with comprehensive error handling."""
from cuvis_ai_core.grpc import helpers
try:
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.numpy_to_proto(cube),
wavelengths=helpers.numpy_to_proto(wavelengths),
),
)
)
return {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in response.outputs.items()
}
except grpc.RpcError as e:
code = e.code()
details = e.details()
if code == grpc.StatusCode.INVALID_ARGUMENT:
print(f"Invalid inference request: {details}")
elif code == grpc.StatusCode.NOT_FOUND:
print(f"Session not found: {details}")
elif code == grpc.StatusCode.FAILED_PRECONDITION:
print(f"Cannot run inference: {details}")
elif code == grpc.StatusCode.RESOURCE_EXHAUSTED:
print(f"Resource exhausted: {details}")
else:
print(f"gRPC error: {code} - {details}")
raise
Best Practices Summary¶
- Always close sessions - Use try/finally or context managers
- Configure message size - Set 600 MB limits for hyperspectral data
- Validate configs early - Use
ValidateConfigbefore training - Filter outputs - Specify
output_specsto reduce payload - Handle errors gracefully - Check gRPC status codes
- Use helper functions - Leverage
grpc_workflow.py - Monitor training progress - Process streaming updates
- Save checkpoints - Periodic
SavePipelineduring training - Reuse channels - Don't create new channels for each request
- Enable TLS in production - Use
ssl_channel_credentials
Troubleshooting¶
Connection Refused¶
Error: grpc._channel._InactiveRpcError: failed to connect to all addresses
Solutions:
- Verify server is running:
ps aux | grep production_server - Check port:
netstat -an | grep 50051 - Test connectivity:
telnet localhost 50051 - Check firewall (if remote):
sudo ufw allow 50051
Message Size Exceeded¶
Error: grpc._channel._MultiThreadedRendezvous: Received message larger than max
Solution:
Session Not Found¶
Error: Session ID not found
Solutions:
- Session expired (1-hour timeout) - create new session
- Server restarted - sessions are not persisted
- Verify session ID is correct
CUDA Out of Memory¶
Error: CUDA out of memory
Solutions:
- Reduce batch size in data config
- Close unused sessions
- Restart server to clear GPU memory
Plugin Management¶
CUVIS.AI supports a plugin system for extending functionality. Plugins can be loaded dynamically at runtime to add custom nodes, data sources, or processing capabilities.
For comprehensive plugin system documentation, see:
- Plugin System Overview - Architecture and core concepts
- Plugin Nodes - Available plugins and loading examples
- Plugin Development Guide - Creating custom plugins
gRPC Integration: Plugins loaded on the server side are automatically available to all gRPC clients. Use the discovery RPCs to query available capabilities after plugins are loaded.
See Also¶
- Client Connections & Sessions - Connection management and session lifecycle patterns
- gRPC API Reference: Sessions - Session RPC method documentation
- gRPC API Reference: Config - Configuration RPC method documentation
- gRPC API Reference: Training & Inference - Training and inference RPC documentation
- gRPC API Reference: Pipeline - Pipeline RPC method documentation
- gRPC Overview - Architecture and concepts
- Sequence Diagrams - Visual workflows
- gRPC Tutorial - Hands-on tutorial
- Deployment Guide - Production patterns