Status: Needs Review
This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.
Remote gRPC Access¶
Access CUVIS.AI pipelines remotely using gRPC for distributed training, inference, and deployment.
Comprehensive gRPC Documentation
For complete gRPC documentation, see:
- gRPC Overview - Introduction and architecture
- gRPC API Reference - Complete documentation of all 46 RPC methods
- Client Patterns - Best practices and common patterns
This guide focuses on detailed examples and deployment scenarios.
Overview¶
CUVIS.AI provides comprehensive gRPC infrastructure for remote pipeline execution:
- Server Deployment: Production-ready server with Docker/Kubernetes support
- Client SDK: Simple API for training, inference, and configuration
- Session Management: Isolated execution contexts with automatic cleanup
- Streaming Updates: Real-time training progress without polling
- Configuration Composition: Hydra integration with dynamic overrides
- Error Handling: Standard gRPC error codes with retry logic
Quick Start¶
Server Setup¶
Start server locally:
Start server with Docker:
Default connection: localhost:50051
Basic Client Usage¶
from cuvis_ai_core.grpc import cuvis_ai_pb2, cuvis_ai_pb2_grpc
import grpc
# Connect to server
channel = grpc.insecure_channel("localhost:50051")
stub = cuvis_ai_pb2_grpc.CuvisAIServiceStub(channel)
# Create session
response = stub.CreateSession(cuvis_ai_pb2.CreateSessionRequest())
session_id = response.session_id
# Run inference (example)
inference_response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(cube=...),
)
)
# Clean up
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
gRPC Server Setup¶
Local Development¶
Launch server:
Default configuration:
- Host: 0.0.0.0 (all network interfaces)
- Port: 50051
- Max workers: 10 (thread pool)
- Max message size: 300 MB
Environment variables:
export GRPC_PORT=50051
export GRPC_MAX_WORKERS=10
export LOG_LEVEL=INFO
export DATA_DIR=/path/to/data
Docker Deployment¶
Dockerfile (production-ready):
FROM cubertgmbh/cuvis_python:3.4.1-ubuntu24.04
WORKDIR /app
# Install dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
# Copy application
COPY cuvis_ai/ cuvis_ai/
COPY configs/ configs/
EXPOSE 50051
# Health check
HEALTHCHECK --interval=30s --timeout=10s \
CMD python -c "import grpc; ..."
CMD ["python", "-m", "cuvis_ai.grpc.production_server"]
Build and run:
Docker Compose¶
docker-compose.yml:
version: "3.8"
services:
cuvis-ai-server:
build: .
ports:
- "50051:50051"
environment:
- GRPC_PORT=50051
- GRPC_MAX_WORKERS=10
- LOG_LEVEL=INFO
volumes:
- ./data:/app/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
Start services:
Kubernetes Deployment¶
deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
name: cuvis-ai-server
spec:
replicas: 3
selector:
matchLabels:
app: cuvis-ai-server
template:
metadata:
labels:
app: cuvis-ai-server
spec:
containers:
- name: cuvis-ai-server
image: cuvis-ai-server:latest
ports:
- containerPort: 50051
resources:
limits:
nvidia.com/gpu: 1
memory: 8Gi
requests:
memory: 4Gi
---
apiVersion: v1
kind: Service
metadata:
name: cuvis-ai-service
spec:
selector:
app: cuvis-ai-server
ports:
- protocol: TCP
port: 50051
targetPort: 50051
type: LoadBalancer
Deploy:
Production Configuration¶
TLS/SSL (secure channel):
import grpc
# Server-side
credentials = grpc.ssl_server_credentials([
(open("server.key", "rb").read(),
open("server.crt", "rb").read())
])
server.add_secure_port("[::]:50051", credentials)
# Client-side
channel = grpc.secure_channel(
"production-server:50051",
grpc.ssl_channel_credentials(),
)
Message size limits:
import grpc
from concurrent import futures
# Server
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
("grpc.max_send_message_length", 1024 * 1024 * 1024), # 1 GB
("grpc.max_receive_message_length", 1024 * 1024 * 1024),
],
)
# Client
options = [
("grpc.max_send_message_length", 1024 * 1024 * 1024),
("grpc.max_receive_message_length", 1024 * 1024 * 1024),
]
channel = grpc.insecure_channel("localhost:50051", options=options)
gRPC Client Usage¶
Helper Utilities¶
Recommended approach: Use helper functions from workflow_utils.py:
File: examples/grpc/workflow_utils.py
from cuvis_ai_core.grpc import cuvis_ai_pb2, cuvis_ai_pb2_grpc
import grpc
from pathlib import Path
def build_stub(
server_address: str = "localhost:50051",
max_msg_size: int = 300 * 1024 * 1024
) -> cuvis_ai_pb2_grpc.CuvisAIServiceStub:
"""Create a gRPC stub with configured message limits."""
options = [
("grpc.max_send_message_length", max_msg_size),
("grpc.max_receive_message_length", max_msg_size),
]
channel = grpc.insecure_channel(server_address, options=options)
return cuvis_ai_pb2_grpc.CuvisAIServiceStub(channel)
def config_search_paths(
extra_paths: list[str] | None = None
) -> list[str]:
"""Return absolute search paths for Hydra composition."""
config_root = Path(__file__).parent.parent / "configs"
seeds = [
config_root,
config_root / "trainrun",
config_root / "pipeline",
config_root / "data",
config_root / "training",
]
paths = [str(p.resolve()) for p in seeds]
if extra_paths:
paths.extend(extra_paths)
return paths
def create_session_with_search_paths(
stub: cuvis_ai_pb2_grpc.CuvisAIServiceStub,
search_paths: list[str] | None = None
) -> str:
"""Create session and register config search paths."""
response = stub.CreateSession(cuvis_ai_pb2.CreateSessionRequest())
session_id = response.session_id
paths = search_paths or config_search_paths()
stub.SetSessionSearchPaths(
cuvis_ai_pb2.SetSessionSearchPathsRequest(
session_id=session_id,
search_paths=paths,
append=False,
)
)
return session_id
def resolve_trainrun_config(
stub: cuvis_ai_pb2_grpc.CuvisAIServiceStub,
session_id: str,
name: str,
overrides: list[str] | None = None,
):
"""Resolve trainrun config via Hydra composition."""
import json
response = stub.ResolveConfig(
cuvis_ai_pb2.ResolveConfigRequest(
session_id=session_id,
config_type="trainrun",
path=f"trainrun/{name}",
overrides=overrides or [],
)
)
config_dict = json.loads(response.config_bytes.decode("utf-8"))
return response, config_dict
def apply_trainrun_config(
stub: cuvis_ai_pb2_grpc.CuvisAIServiceStub,
session_id: str,
config_bytes: bytes,
):
"""Apply resolved trainrun config to session."""
stub.SetTrainRunConfig(
cuvis_ai_pb2.SetTrainRunConfigRequest(
session_id=session_id,
config=cuvis_ai_pb2.TrainRunConfig(config_bytes=config_bytes),
)
)
Session Management¶
Create session:
from workflow_utils import build_stub, create_session_with_search_paths
stub = build_stub("localhost:50051")
session_id = create_session_with_search_paths(stub)
Close session:
Session lifecycle: - Sessions are isolated (separate pipelines, configs, weights) - Sessions expire after 1 hour of inactivity - Always close sessions when done to free resources
Configuration Resolution¶
Resolve config with overrides:
from workflow_utils import resolve_trainrun_config, apply_trainrun_config
# Resolve trainrun config
resolved, config_dict = resolve_trainrun_config(
stub,
session_id,
"rx_statistical", # trainrun name
overrides=[
"data.batch_size=4",
"training.trainer.max_epochs=10",
"training.optimizer.lr=0.001",
],
)
# Apply resolved config
apply_trainrun_config(stub, session_id, resolved.config_bytes)
Available override patterns:
overrides = [
# Training config
"training.trainer.max_epochs=100",
"training.optimizer.lr=0.001",
"training.optimizer.weight_decay=0.01",
"training.scheduler.patience=10",
# Data config
"data.batch_size=16",
"data.train_ids=[0,1,2]",
"data.val_ids=[3,4]",
# Pipeline node params
"pipeline.nodes.channel_selector.params.tau_start=8.0",
"pipeline.nodes.rx_detector.params.eps=1e-6",
]
Training Operations¶
Statistical Training¶
# Run statistical training (streaming progress)
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
)
):
stage = cuvis_ai_pb2.ExecutionStage.Name(progress.context.stage)
status = cuvis_ai_pb2.TrainStatus.Name(progress.status)
epoch = progress.context.epoch
batch = progress.context.batch_idx
print(f"[{stage}] {status} | epoch={epoch} batch={batch}")
if progress.metrics:
metrics = dict(progress.metrics)
print(f" Metrics: {metrics}")
Gradient Training¶
# Run gradient training
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
stage = cuvis_ai_pb2.ExecutionStage.Name(progress.context.stage)
status = cuvis_ai_pb2.TrainStatus.Name(progress.status)
losses = dict(progress.losses) if progress.losses else {}
metrics = dict(progress.metrics) if progress.metrics else {}
print(f"[{stage}] {status} | losses={losses} | metrics={metrics}")
Two-Phase Training¶
# Phase 1: Statistical initialization
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
)
):
print(f"[Statistical Init] {format_progress(progress)}")
# Phase 2: Gradient training
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(f"[Gradient Training] {format_progress(progress)}")
Inference Operations¶
Basic inference:
from cuvis_ai_core.grpc import helpers
import numpy as np
# Prepare inputs
cube = np.random.rand(1, 32, 32, 61).astype(np.float32)
wavelengths = np.linspace(430, 910, 61).reshape(1, -1).astype(np.int32)
# Run inference
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.numpy_to_proto(cube),
wavelengths=helpers.numpy_to_proto(wavelengths),
),
)
)
# Convert outputs
results = {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in response.outputs.items()
}
print(f"Outputs: {list(results.keys())}")
print(f"Decisions shape: {results['decider.decisions'].shape}")
Inference with output filtering:
# Request specific outputs only
response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(cube=..., wavelengths=...),
output_specs=[
"selector.selected",
"detector.scores",
"decider.decisions",
],
)
)
Batch inference:
from torch.utils.data import DataLoader
from cuvis_ai.datamodule.cu3s_dataset import SingleCu3sDataset
dataset = SingleCu3sDataset(
cu3s_file_path="data/Lentils/Lentils_000.cu3s",
processing_mode="Reflectance",
)
dataloader = DataLoader(dataset, batch_size=1)
for batch in dataloader:
inference_response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.tensor_to_proto(batch["cube"]),
wavelengths=helpers.tensor_to_proto(batch["wavelengths"]),
),
)
)
# Process results
decisions = helpers.proto_to_numpy(
inference_response.outputs["decider.decisions"]
)
Pipeline Management¶
Load pipeline from config:
import json
# Resolve pipeline config
pipeline_config = stub.ResolveConfig(
cuvis_ai_pb2.ResolveConfigRequest(
session_id=session_id,
config_type="pipeline",
path="pipeline/rx_statistical",
overrides=[],
)
)
# Load pipeline
stub.LoadPipeline(
cuvis_ai_pb2.LoadPipelineRequest(
session_id=session_id,
pipeline=cuvis_ai_pb2.PipelineConfig(
config_bytes=pipeline_config.config_bytes
),
)
)
Load trained weights:
stub.LoadPipelineWeights(
cuvis_ai_pb2.LoadPipelineWeightsRequest(
session_id=session_id,
weights_path="outputs/my_experiment/weights.pt",
strict=True,
)
)
Save pipeline and weights:
save_response = stub.SavePipeline(
cuvis_ai_pb2.SavePipelineRequest(
session_id=session_id,
pipeline_path="outputs/my_pipeline.yaml",
metadata=cuvis_ai_pb2.PipelineMetadata(
name="My Pipeline",
description="Trained anomaly detection pipeline",
),
)
)
print(f"Pipeline saved: {save_response.pipeline_path}")
print(f"Weights saved: {save_response.weights_path}")
Get pipeline specifications:
# Get input specs
inputs_response = stub.GetPipelineInputs(
cuvis_ai_pb2.GetPipelineInputsRequest(session_id=session_id)
)
for name, spec in inputs_response.inputs.items():
dtype = cuvis_ai_pb2.DType.Name(spec.dtype)
print(f"Input: {name} | dtype={dtype} | shape={spec.shape}")
# Get output specs
outputs_response = stub.GetPipelineOutputs(
cuvis_ai_pb2.GetPipelineOutputsRequest(session_id=session_id)
)
for name, spec in outputs_response.outputs.items():
dtype = cuvis_ai_pb2.DType.Name(spec.dtype)
print(f"Output: {name} | dtype={dtype} | shape={spec.shape}")
Restore TrainRun¶
Restore from trainrun file:
restore_response = stub.RestoreTrainRun(
cuvis_ai_pb2.RestoreTrainRunRequest(
trainrun_path="outputs/my_experiment/trainrun.yaml",
weights_path="outputs/my_experiment/weights.pt", # optional
strict=True,
)
)
print(f"Restored session: {restore_response.session_id}")
Continue training from checkpoint:
# Restore trainrun
restore_response = stub.RestoreTrainRun(
cuvis_ai_pb2.RestoreTrainRunRequest(
trainrun_path="outputs/my_experiment/trainrun.yaml",
weights_path="outputs/my_experiment/checkpoints/epoch=10.ckpt",
)
)
# Continue training
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=restore_response.session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(format_progress(progress))
Complete Examples¶
Example 1: Statistical Training¶
File: examples/grpc/statistical_training_client.py
from workflow_utils import (
build_stub, create_session_with_search_paths,
resolve_trainrun_config, apply_trainrun_config, format_progress
)
from cuvis_ai_core.grpc import cuvis_ai_pb2
# Connect to server
stub = build_stub("localhost:50051")
# Create session
session_id = create_session_with_search_paths(stub)
# Resolve trainrun config
resolved, config_dict = resolve_trainrun_config(
stub,
session_id,
"rx_statistical",
overrides=[
"data.batch_size=4",
"training.trainer.max_epochs=1",
"training.seed=42",
],
)
# Apply config
apply_trainrun_config(stub, session_id, resolved.config_bytes)
# Run statistical training
print("Running statistical training...")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
)
):
print(format_progress(progress))
# Clean up
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
print("Done!")
Example 2: Gradient Training¶
File: examples/grpc/gradient_training_client.py
from workflow_utils import (
build_stub, create_session_with_search_paths,
resolve_trainrun_config, apply_trainrun_config, format_progress
)
from cuvis_ai_core.grpc import cuvis_ai_pb2
stub = build_stub("localhost:50051")
session_id = create_session_with_search_paths(stub)
# Resolve gradient trainrun
resolved, config_dict = resolve_trainrun_config(
stub,
session_id,
"deep_svdd",
overrides=[
"training.trainer.max_epochs=5",
"training.optimizer.lr=0.0005",
"training.optimizer.weight_decay=0.005",
],
)
apply_trainrun_config(stub, session_id, resolved.config_bytes)
# Statistical initialization
print("Statistical initialization...")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
)
):
print(format_progress(progress))
# Gradient training
print("Gradient training...")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(format_progress(progress))
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
Example 3: Complete Workflow¶
File: examples/grpc/complete_workflow_client.py
from workflow_utils import (
build_stub, create_session_with_search_paths,
resolve_trainrun_config, apply_trainrun_config, format_progress
)
from cuvis_ai_core.grpc import cuvis_ai_pb2, helpers
import numpy as np
stub = build_stub("localhost:50051")
session_id = create_session_with_search_paths(stub)
# 1. Resolve and apply config
resolved, config_dict = resolve_trainrun_config(
stub, session_id, "channel_selector",
overrides=["training.trainer.max_epochs=3"],
)
apply_trainrun_config(stub, session_id, resolved.config_bytes)
# 2. Train pipeline
print("Training pipeline...")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(format_progress(progress))
# 3. Save pipeline
save_response = stub.SavePipeline(
cuvis_ai_pb2.SavePipelineRequest(
session_id=session_id,
pipeline_path="outputs/my_pipeline.yaml",
)
)
print(f"Saved: {save_response.pipeline_path}, {save_response.weights_path}")
# 4. Run inference
cube = np.random.rand(1, 32, 32, 61).astype(np.float32)
wavelengths = np.linspace(430, 910, 61).reshape(1, -1).astype(np.int32)
inference = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.numpy_to_proto(cube),
wavelengths=helpers.numpy_to_proto(wavelengths),
),
)
)
results = {
name: helpers.proto_to_numpy(tensor_proto)
for name, tensor_proto in inference.outputs.items()
}
print(f"Inference outputs: {list(results.keys())}")
# 5. Clean up
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
Example 4: Inference with Pretrained Model¶
File: examples/grpc/run_inference.py
from workflow_utils import build_stub, create_session_with_search_paths
from cuvis_ai_core.grpc import cuvis_ai_pb2, helpers
from cuvis_ai.datamodule.cu3s_dataset import SingleCu3sDataset
from torch.utils.data import DataLoader
from pathlib import Path
def run_inference(
pipeline_path: Path,
weights_path: Path,
cu3s_file_path: Path,
server_address: str = "localhost:50051",
):
stub = build_stub(server_address, max_msg_size=600 * 1024 * 1024)
session_id = create_session_with_search_paths(stub)
# Resolve pipeline config
pipeline_config = stub.ResolveConfig(
cuvis_ai_pb2.ResolveConfigRequest(
session_id=session_id,
config_type="pipeline",
path=str(pipeline_path),
overrides=[],
)
)
# Load pipeline and weights
stub.LoadPipeline(
cuvis_ai_pb2.LoadPipelineRequest(
session_id=session_id,
pipeline=cuvis_ai_pb2.PipelineConfig(
config_bytes=pipeline_config.config_bytes
),
)
)
stub.LoadPipelineWeights(
cuvis_ai_pb2.LoadPipelineWeightsRequest(
session_id=session_id,
weights_path=str(weights_path),
strict=True,
)
)
# Load data
dataset = SingleCu3sDataset(
cu3s_file_path=str(cu3s_file_path),
processing_mode="Reflectance",
)
dataloader = DataLoader(dataset, batch_size=1)
# Run inference
print(f"Running inference on {len(dataset)} samples...")
for batch in dataloader:
inference_response = stub.Inference(
cuvis_ai_pb2.InferenceRequest(
session_id=session_id,
inputs=cuvis_ai_pb2.InputBatch(
cube=helpers.tensor_to_proto(batch["cube"]),
wavelengths=helpers.tensor_to_proto(batch["wavelengths"]),
),
)
)
# Process outputs
decisions = helpers.proto_to_numpy(
inference_response.outputs["decider.decisions"]
)
print(f"Sample decisions: {decisions.shape}")
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
if __name__ == "__main__":
run_inference(
pipeline_path=Path("outputs/my_experiment/pipeline.yaml"),
weights_path=Path("outputs/my_experiment/weights.pt"),
cu3s_file_path=Path("data/Lentils/Lentils_000.cu3s"),
)
Example 5: Restore TrainRun¶
File: examples/grpc/restore_trainrun_grpc.py
from workflow_utils import build_stub, create_session_with_search_paths, format_progress
from cuvis_ai_core.grpc import cuvis_ai_pb2
from pathlib import Path
from typing import Literal
def restore_trainrun_grpc(
trainrun_path: Path,
mode: Literal["info", "train", "validate", "test"] = "info",
weights_path: Path | None = None,
server_address: str = "localhost:50051",
):
stub = build_stub(server_address)
session_id = create_session_with_search_paths(stub)
# Restore trainrun
restore_response = stub.RestoreTrainRun(
cuvis_ai_pb2.RestoreTrainRunRequest(
trainrun_path=str(trainrun_path),
weights_path=str(weights_path) if weights_path else None,
strict=True,
)
)
if mode == "info":
# Display pipeline info
inputs = stub.GetPipelineInputs(
cuvis_ai_pb2.GetPipelineInputsRequest(session_id=session_id)
)
outputs = stub.GetPipelineOutputs(
cuvis_ai_pb2.GetPipelineOutputsRequest(session_id=session_id)
)
print("Pipeline Inputs:")
for name, spec in inputs.inputs.items():
print(f" {name}: {cuvis_ai_pb2.DType.Name(spec.dtype)} {spec.shape}")
print("Pipeline Outputs:")
for name, spec in outputs.outputs.items():
print(f" {name}: {cuvis_ai_pb2.DType.Name(spec.dtype)} {spec.shape}")
elif mode == "train":
# Continue training
print("Continuing training...")
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
)
):
print(format_progress(progress))
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
if __name__ == "__main__":
restore_trainrun_grpc(
trainrun_path=Path("outputs/my_experiment/trainrun.yaml"),
mode="train",
weights_path=Path("outputs/my_experiment/checkpoints/epoch=10.ckpt"),
)
Error Handling & Retries¶
gRPC Error Codes¶
import grpc
try:
response = stub.CreateSession(request)
except grpc.RpcError as exc:
code = exc.code()
details = exc.details()
if code == grpc.StatusCode.INVALID_ARGUMENT:
print(f"Invalid request: {details}")
elif code == grpc.StatusCode.NOT_FOUND:
print(f"Resource not found: {details}")
elif code == grpc.StatusCode.FAILED_PRECONDITION:
print(f"Operation not allowed: {details}")
elif code == grpc.StatusCode.UNAVAILABLE:
print(f"Server unavailable: {details}")
elif code == grpc.StatusCode.INTERNAL:
print(f"Internal error: {details}")
else:
print(f"gRPC error [{code}]: {details}")
Common error scenarios:
- INVALID_ARGUMENT: Malformed inputs (e.g., missing cube, invalid shape)
- NOT_FOUND: Unknown session ID or checkpoint path
- FAILED_PRECONDITION: Operation not allowed in current state (e.g., inference before loading pipeline)
- UNAVAILABLE: Server not running or network issues
- RESOURCE_EXHAUSTED: Message size exceeded or server overloaded
- INTERNAL: Unexpected server error (check server logs)
Retry Logic with Exponential Backoff¶
import time
import grpc
def train_with_retry(
stub,
session_id: str,
trainer_type,
max_retries: int = 3,
):
"""Train with retry logic and exponential backoff."""
for attempt in range(max_retries):
try:
for progress in stub.Train(
cuvis_ai_pb2.TrainRequest(
session_id=session_id,
trainer_type=trainer_type,
)
):
yield progress
break # Success
except grpc.RpcError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retry {attempt + 1}/{max_retries} after {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Max retries reached. Error: {e.details()}")
raise
Health Checks¶
def check_server_health(server_address: str = "localhost:50051") -> bool:
"""Check if server is healthy."""
try:
stub = build_stub(server_address)
response = stub.CreateSession(cuvis_ai_pb2.CreateSessionRequest())
stub.CloseSession(
cuvis_ai_pb2.CloseSessionRequest(session_id=response.session_id)
)
return True
except grpc.RpcError:
return False
# Usage
if not check_server_health():
print("Server is not available!")
else:
print("Server is healthy")
Best Practices¶
1. Session Management¶
Always close sessions:
session_id = None
try:
session_id = create_session_with_search_paths(stub)
# ... operations ...
finally:
if session_id:
stub.CloseSession(
cuvis_ai_pb2.CloseSessionRequest(session_id=session_id)
)
Use context managers:
from contextlib import contextmanager
@contextmanager
def grpc_session(stub):
session_id = create_session_with_search_paths(stub)
try:
yield session_id
finally:
stub.CloseSession(
cuvis_ai_pb2.CloseSessionRequest(session_id=session_id)
)
# Usage
with grpc_session(stub) as session_id:
# ... operations ...
pass
# Session automatically closed
2. Message Size Configuration¶
Client and server must agree:
# Client
stub = build_stub("localhost:50051", max_msg_size=1024 * 1024 * 1024) # 1 GB
# Server must also have 1 GB limits
Guidelines: - Default 300 MB: Suitable for most hyperspectral data - 600 MB: Large cubes or high-resolution data - 1 GB: Very large datasets or batch inference
3. Error Handling¶
Catch specific errors:
try:
response = stub.Inference(request)
except grpc.RpcError as exc:
if exc.code() == grpc.StatusCode.RESOURCE_EXHAUSTED:
print("Message too large. Reduce batch size or increase limits.")
elif exc.code() == grpc.StatusCode.UNAVAILABLE:
print("Server unavailable. Check connection and retry.")
else:
raise
4. Configuration Overrides¶
Use structured overrides:
# Good: Clear intent
overrides = [
"training.trainer.max_epochs=100",
"training.optimizer.lr=0.001",
"data.batch_size=16",
]
# Avoid: Hardcoded strings
overrides = ["max_epochs=100"] # Ambiguous
5. Streaming Progress¶
Handle all progress states:
for progress in stub.Train(...):
status = cuvis_ai_pb2.TrainStatus.Name(progress.status)
if status == "TRAIN_STATUS_RUNNING":
# Update progress bar
pass
elif status == "TRAIN_STATUS_COMPLETED":
# Training finished
break
elif status == "TRAIN_STATUS_FAILED":
# Handle failure
print(f"Training failed: {progress.error_message}")
break
6. Production Deployment¶
Use TLS in production:
# Never use insecure channels in production
# BAD: grpc.insecure_channel("production-server:50051")
# GOOD: Use TLS
channel = grpc.secure_channel(
"production-server:50051",
grpc.ssl_channel_credentials(),
)
Implement health checks:
# docker-compose.yml
healthcheck:
test: ["CMD", "python", "-c", "import grpc; ..."]
interval: 30s
timeout: 10s
retries: 3
Monitor resource usage: - Track active sessions - Monitor GPU memory - Log training progress - Set session timeout
Troubleshooting¶
Server Not Running¶
Problem: grpc.StatusCode.UNAVAILABLE
Solution:
# Check if server is running
ps aux | grep production_server
# Check port availability
netstat -an | grep 50051
# Test connection
telnet localhost 50051
# Restart server
docker-compose restart
Message Size Exceeded¶
Problem: grpc.StatusCode.RESOURCE_EXHAUSTED
Solution:
# Increase client limits
stub = build_stub("localhost:50051", max_msg_size=1024 * 1024 * 1024)
# OR reduce batch size
overrides=["data.batch_size=1"]
Session Expired¶
Problem: grpc.StatusCode.NOT_FOUND (session not found)
Solution: - Sessions expire after 1 hour of inactivity - Create a new session - Implement periodic activity to keep session alive
import threading
import time
def keep_alive(stub, session_id, interval=300):
"""Send periodic heartbeat to keep session alive."""
while True:
time.sleep(interval)
try:
stub.GetPipelineInputs(
cuvis_ai_pb2.GetPipelineInputsRequest(session_id=session_id)
)
except grpc.RpcError:
break # Session closed or expired
CUDA Out of Memory¶
Problem: Training fails with CUDA OOM error
Solution:
# Reduce batch size
overrides=["data.batch_size=1"]
# Close unused sessions
stub.CloseSession(...)
# Monitor GPU memory
# nvidia-smi
Connection Timeout¶
Problem: Long-running operations timeout
Solution:
# Increase timeout for long operations
channel = grpc.insecure_channel(
"localhost:50051",
options=[
("grpc.max_receive_message_length", 1024 * 1024 * 1024),
("grpc.keepalive_time_ms", 30000),
("grpc.keepalive_timeout_ms", 10000),
],
)
See Also¶
- API Reference:
- gRPC API Reference - Complete RPC method documentation
- Protocol Definitions - Message format specifications
- Deployment:
- gRPC Deployment Guide - Production deployment patterns
- Docker Configuration - Container setup
- Kubernetes Setup - Orchestration
- Tutorials:
- gRPC Workflow Tutorial - Comprehensive end-to-end guide
- Examples:
examples/grpc/workflow_utils.py- Helper utilitiesexamples/grpc/statistical_training_client.py- Statistical trainingexamples/grpc/gradient_training_client.py- Gradient trainingexamples/grpc/complete_workflow_client.py- End-to-end workflowexamples/grpc/run_inference.py- Inference with pretrained modelsexamples/grpc/restore_trainrun_grpc.py- TrainRun restoration