Skip to content

Status: Needs Review

This page has not been reviewed for accuracy and completeness. Content may be outdated or contain errors.


gRPC API Reference

Complete reference documentation for the CuvisAIService with 46 RPC methods.


Overview

The CUVIS.AI gRPC service exposes all functionality through a single CuvisAIService endpoint with 46 RPC methods organized into 6 functional categories:

Category Methods Purpose
Session Management 3 Create, configure, and close isolated execution contexts
Configuration Management 4 Resolve, validate, and apply Hydra configurations
Pipeline Management 5 Load, save, and manage pipeline state
Training Operations 3 Execute statistical and gradient-based training
Inference Operations 1 Run predictions on trained pipelines
Introspection & Discovery 6 Query capabilities, inspect pipelines, visualize graphs

Protocol Buffers: All methods use Protocol Buffers (protobuf) for serialization.

Service Definition: CuvisAIService in cuvis_ai_core.proto


Connection & Setup

Creating a gRPC Stub

from cuvis_ai_core.grpc import cuvis_ai_pb2, cuvis_ai_pb2_grpc
import grpc

# Configure message size limits for hyperspectral data
options = [
    ("grpc.max_send_message_length", 300 * 1024 * 1024),  # 300 MB
    ("grpc.max_receive_message_length", 300 * 1024 * 1024),
]

# Create channel and stub
channel = grpc.insecure_channel("localhost:50051", options=options)
stub = cuvis_ai_pb2_grpc.CuvisAIServiceStub(channel)

For production, use secure channels with TLS:

credentials = grpc.ssl_channel_credentials()
channel = grpc.secure_channel("production-server:50051", credentials, options=options)

Helper Utilities

The examples/grpc/workflow_utils.py module provides convenience functions that simplify common operations:

from examples.grpc.workflow_utils import (
    build_stub,                         # Create configured stub
    config_search_paths,                # Build Hydra search paths
    create_session_with_search_paths,   # Session + search paths
    resolve_trainrun_config,            # Resolve config with Hydra
    apply_trainrun_config,              # Apply resolved config
    format_progress,                    # Pretty-print training progress
)

# Quick connection
stub = build_stub("localhost:50051", max_msg_size=600*1024*1024)

# Quick session setup
session_id = create_session_with_search_paths(stub)

Session Management

Sessions provide isolated execution contexts for each client. Each session has independent pipeline state, training configuration, and resources.

CreateSession

Purpose: Initialize a new isolated session context.

Request:

message CreateSessionRequest {}

Response:

message CreateSessionResponse {
  string session_id = 1;  // Unique session identifier (UUID)
}

Python Example:

response = stub.CreateSession(cuvis_ai_pb2.CreateSessionRequest())
session_id = response.session_id
print(f"Session created: {session_id}")

Notes: - Sessions are isolated: separate pipeline, weights, data configs - Sessions automatically expire after 1 hour of inactivity - Each session consumes GPU/CPU memory until closed - Session IDs are UUIDs (e.g., "7f3e4d2c-1a9b-4c8d-9e7f-2b5a6c8d9e0f")

See Also: - SetSessionSearchPaths - Configure Hydra search paths - CloseSession - Release resources


SetSessionSearchPaths

Purpose: Register Hydra configuration search paths for the session.

Request:

message SetSessionSearchPathsRequest {
  string session_id = 1;
  repeated string search_paths = 2;  // Absolute paths
  bool append = 3;                   // false = replace, true = append
}

Response:

message SetSessionSearchPathsResponse {
  bool success = 1;
}

Python Example:

from pathlib import Path

# Build search paths (typical pattern)
config_root = Path(__file__).parent.parent / "configs"
search_paths = [
    str(config_root),
    str(config_root / "trainrun"),
    str(config_root / "pipeline"),
    str(config_root / "data"),
    str(config_root / "training"),
]

# Register search paths
stub.SetSessionSearchPaths(
    cuvis_ai_pb2.SetSessionSearchPathsRequest(
        session_id=session_id,
        search_paths=search_paths,
        append=False,  # Replace any existing paths
    )
)

Notes: - Must be called before ResolveConfig for Hydra composition to work - Paths must be absolute paths (not relative) - Use append=False (default) to replace existing paths - Use append=True to add paths to existing list - Common pattern: call immediately after CreateSession

Helper Function:

from examples.grpc.workflow_utils import config_search_paths, create_session_with_search_paths

# Get standard search paths
paths = config_search_paths(extra_paths=["/custom/configs"])

# Create session with search paths in one call
session_id = create_session_with_search_paths(stub, search_paths=paths)

See Also: - ResolveConfig - Resolve configs using these paths - Hydra Composition Guide


CloseSession

Purpose: Release all resources associated with a session.

Request:

message CloseSessionRequest {
  string session_id = 1;
}

Response:

message CloseSessionResponse {
  bool success = 1;
}

Python Example:

# Always close sessions when done
stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))
print("Session closed, resources freed")

Best Practice - Use try/finally:

session_id = None
try:
    session_id = stub.CreateSession(...).session_id
    # ... training or inference ...
finally:
    if session_id:
        stub.CloseSession(cuvis_ai_pb2.CloseSessionRequest(session_id=session_id))

Notes: - Always close sessions to free GPU/CPU memory immediately - Unclosed sessions expire after 1 hour but hold resources until then - Closing releases: pipeline, weights, data loaders, CUDA memory - Safe to call even if session doesn't exist (returns success=false) - No error if session already expired

See Also: - Client Patterns: Session Management


Configuration Management

The configuration service integrates with Hydra for powerful config composition, validation, and dynamic overrides.

ResolveConfig

Purpose: Resolve configuration using Hydra composition with optional overrides.

Request:

message ResolveConfigRequest {
  string session_id = 1;
  string config_type = 2;        // "trainrun", "pipeline", "training", "data"
  string path = 3;                // Relative path in search paths
  repeated string overrides = 4;  // Hydra override syntax
}

Response:

message ResolveConfigResponse {
  bytes config_bytes = 1;    // JSON-serialized config
  string resolved_path = 2;  // Full path that was resolved
}

Python Example:

import json

# Resolve trainrun config with overrides
response = stub.ResolveConfig(
    cuvis_ai_pb2.ResolveConfigRequest(
        session_id=session_id,
        config_type="trainrun",
        path="trainrun/deep_svdd",  # Or just "deep_svdd"
        overrides=[
            "training.trainer.max_epochs=50",
            "training.optimizer.lr=0.0005",
            "data.batch_size=8",
        ],
    )
)

# Parse returned JSON
config_dict = json.loads(response.config_bytes.decode("utf-8"))
print(f"Resolved config: {config_dict['name']}")
print(f"Pipeline: {config_dict['pipeline']['name']}")

Config Types: - "trainrun" - Complete training run composition (pipeline + data + training) - "pipeline" - Pipeline-only configuration - "training" - Training parameters (optimizer, scheduler, trainer) - "data" - Data loading configuration

Override Patterns:

overrides = [
    # Training parameters
    "training.trainer.max_epochs=100",
    "training.trainer.accelerator=gpu",
    "training.optimizer.lr=0.001",
    "training.optimizer.weight_decay=0.01",
    "training.scheduler.mode=min",
    "training.scheduler.patience=10",

    # Data parameters
    "data.batch_size=16",
    "data.train_ids=[0,1,2]",
    "data.val_ids=[3,4]",
    "data.cu3s_file_path=/data/Lentils_000.cu3s",

    # Pipeline node parameters
    "pipeline.nodes.channel_selector.params.tau_start=8.0",
    "pipeline.nodes.rx_detector.params.eps=1e-6",
    "pipeline.nodes.normalizer.params.use_running_stats=true",
]

Notes: - Requires SetSessionSearchPaths to be called first - Returns JSON bytes (decode with .decode("utf-8") and parse with json.loads()) - Hydra resolves config group composition, interpolations, and overrides - Override syntax follows Hydra conventions (dot notation for nested fields)

Helper Function:

from examples.grpc.workflow_utils import resolve_trainrun_config

# Resolve trainrun config (returns response + parsed dict)
resolved, config_dict = resolve_trainrun_config(
    stub,
    session_id,
    "deep_svdd",
    overrides=["training.trainer.max_epochs=10"],
)

See Also: - SetTrainRunConfig - Apply resolved config - ValidateConfig - Pre-validate configs - Hydra Composition Guide - TrainRun Schema


SetTrainRunConfig

Purpose: Apply resolved trainrun configuration to session (builds pipeline, sets data/training configs).

Request:

message SetTrainRunConfigRequest {
  string session_id = 1;
  TrainRunConfig config = 2;
}

message TrainRunConfig {
  bytes config_bytes = 1;  // JSON from ResolveConfig
}

Response:

message SetTrainRunConfigResponse {
  bool success = 1;
}

Python Example:

# First resolve config
resolved = stub.ResolveConfig(
    cuvis_ai_pb2.ResolveConfigRequest(
        session_id=session_id,
        config_type="trainrun",
        path="trainrun/rx_statistical",
    )
)

# Apply to session
stub.SetTrainRunConfig(
    cuvis_ai_pb2.SetTrainRunConfigRequest(
        session_id=session_id,
        config=cuvis_ai_pb2.TrainRunConfig(config_bytes=resolved.config_bytes),
    )
)
print("TrainRun config applied, pipeline built")

What This Does: 1. Parses trainrun configuration JSON 2. Builds pipeline from pipeline config 3. Initializes data loader from data config 4. Sets training parameters (optimizer, scheduler, trainer) 5. Prepares session for training or inference

Notes: - Must be called after ResolveConfig - Replaces any existing pipeline/config in session - After this call, session is ready for Train() or Inference() - Validates config structure (raises error if malformed)

Helper Function:

from examples.grpc.workflow_utils import apply_trainrun_config

apply_trainrun_config(stub, session_id, resolved.config_bytes)

See Also: - ResolveConfig - Resolve config first - Train - Execute training after config applied


ValidateConfig

Purpose: Pre-validate configuration before applying (catch errors early).

Request:

message ValidateConfigRequest {
  string config_type = 1;  // "training", "pipeline", "data", etc.
  bytes config_bytes = 2;   // JSON configuration
}

Response:

message ValidateConfigResponse {
  bool valid = 1;
  repeated string errors = 2;     // Fatal errors
  repeated string warnings = 3;   // Non-fatal warnings
}

Python Example:

import json

# Validate training config before use
training_config = {
    "trainer": {"max_epochs": 10, "accelerator": "gpu"},
    "optimizer": {"name": "adam", "lr": 0.001},
}

validation = stub.ValidateConfig(
    cuvis_ai_pb2.ValidateConfigRequest(
        config_type="training",
        config_bytes=json.dumps(training_config).encode("utf-8"),
    )
)

if not validation.valid:
    print("Configuration validation failed:")
    for error in validation.errors:
        print(f"  ERROR: {error}")
    raise ValueError("Invalid configuration")

for warning in validation.warnings:
    print(f"  WARNING: {warning}")

Config Types: - "training" - Training parameters (optimizer, scheduler, trainer) - "pipeline" - Pipeline structure and node configs - "data" - Data loading configuration - "trainrun" - Complete trainrun composition

Common Validation Errors: - Missing required fields (e.g., optimizer.lr) - Invalid values (e.g., negative max_epochs) - Type mismatches (e.g., string for numeric field) - Unknown optimizer/scheduler names - Incompatible node connections in pipeline

Notes: - Validation is optional but highly recommended - Catches configuration errors before starting training - Warnings are informational (config still valid) - Errors mean config is invalid and will fail if applied

See Also: - SetTrainRunConfig - Apply config after validation - TrainRun Schema


GetTrainingCapabilities

Purpose: Discover supported optimizers, schedulers, callbacks, and their parameter schemas.

Request:

message GetTrainingCapabilitiesRequest {}

Response:

message GetTrainingCapabilitiesResponse {
  repeated string optimizer_names = 1;     // e.g., ["adam", "sgd", "adamw"]
  repeated string scheduler_names = 2;     // e.g., ["step_lr", "reduce_on_plateau"]
  repeated string callback_names = 3;      // e.g., ["early_stopping", "model_checkpoint"]
  map<string, ParameterSchema> optimizer_schemas = 4;
  map<string, ParameterSchema> scheduler_schemas = 5;
}

Python Example:

capabilities = stub.GetTrainingCapabilities(
    cuvis_ai_pb2.GetTrainingCapabilitiesRequest()
)

print("Available optimizers:", capabilities.optimizer_names)
print("Available schedulers:", capabilities.scheduler_names)
print("Available callbacks:", capabilities.callback_names)

# Get parameter schema for Adam optimizer
adam_schema = capabilities.optimizer_schemas["adam"]
print(f"Adam parameters: {adam_schema}")

Use Cases: - Dynamic UI generation (list available options) - Config validation (check if optimizer exists) - Documentation generation - Discovery for programmatic workflows

See Also: - ValidateConfig - Validate configs using these capabilities


Pipeline Management

Manage pipeline loading, saving, and restoration.

LoadPipeline

Purpose: Build pipeline from YAML configuration.

Request:

message LoadPipelineRequest {
  string session_id = 1;
  PipelineConfig pipeline = 2;
}

message PipelineConfig {
  bytes config_bytes = 1;  // YAML or JSON serialized
}

Response:

message LoadPipelineResponse {
  bool success = 1;
}

Python Example:

from pathlib import Path
import yaml
import json

# Load pipeline config from YAML file
pipeline_yaml = yaml.safe_load(Path("pipeline.yaml").read_text())
pipeline_json = json.dumps(pipeline_yaml).encode("utf-8")

stub.LoadPipeline(
    cuvis_ai_pb2.LoadPipelineRequest(
        session_id=session_id,
        pipeline=cuvis_ai_pb2.PipelineConfig(config_bytes=pipeline_json),
    )
)
print("Pipeline loaded from config")

Notes: - Pipeline config can be YAML or JSON (server parses both) - Builds complete pipeline graph from node definitions and connections - Does NOT load weights (use LoadPipelineWeights separately) - After loading, pipeline is ready for training or inference - See Pipeline Schema for config format

See Also: - LoadPipelineWeights - Load trained weights - SavePipeline - Save pipeline config + weights - Pipeline Schema


LoadPipelineWeights

Purpose: Load trained weights into pipeline from checkpoint file.

Request:

message LoadPipelineWeightsRequest {
  string session_id = 1;
  string weights_path = 2;  // Path to .pt file
  bool strict = 3;          // Require exact match (default: true)
}

Response:

message LoadPipelineWeightsResponse {
  bool success = 1;
  repeated string missing_keys = 2;    // Keys in config but not in weights
  repeated string unexpected_keys = 3; // Keys in weights but not in config
}

Python Example:

# Load pipeline first, then weights
stub.LoadPipeline(...)  # See LoadPipeline example

response = stub.LoadPipelineWeights(
    cuvis_ai_pb2.LoadPipelineWeightsRequest(
        session_id=session_id,
        weights_path="outputs/deep_svdd.pt",
        strict=True,  # Fail if weights don't match exactly
    )
)

if response.missing_keys:
    print(f"Warning: Missing keys: {response.missing_keys}")
if response.unexpected_keys:
    print(f"Warning: Unexpected keys: {response.unexpected_keys}")

Strict vs Non-Strict Loading: - strict=True (default): Fails if weights don't match pipeline exactly - strict=False: Loads matching weights, ignores mismatches (useful for transfer learning)

Notes: - Must call LoadPipeline first to build pipeline structure - Weights file is PyTorch .pt checkpoint - Use strict=False for transfer learning or partial weight loading - Check missing_keys and unexpected_keys for debugging

See Also: - LoadPipeline - Load pipeline config first - SavePipeline - Save weights


SavePipeline

Purpose: Save pipeline configuration and weights to files.

Request:

message SavePipelineRequest {
  string session_id = 1;
  string pipeline_path = 2;        // Path for YAML config
  PipelineMetadata metadata = 3;   // Optional metadata
}

message PipelineMetadata {
  string name = 1;
  string description = 2;
  repeated string tags = 3;
  string author = 4;
}

Response:

message SavePipelineResponse {
  string pipeline_path = 1;  // Saved YAML config path
  string weights_path = 2;   // Saved .pt weights path
}

Python Example:

# Save pipeline after training
response = stub.SavePipeline(
    cuvis_ai_pb2.SavePipelineRequest(
        session_id=session_id,
        pipeline_path="outputs/my_pipeline.yaml",
        metadata=cuvis_ai_pb2.PipelineMetadata(
            name="Deep SVDD Anomaly Detector",
            description="Trained on Lentils dataset with 50 epochs",
            tags=["anomaly_detection", "deep_svdd", "production"],
            author="your_name",
        ),
    )
)

print(f"Pipeline saved to: {response.pipeline_path}")
print(f"Weights saved to: {response.weights_path}")

What Gets Saved: 1. YAML config - Complete pipeline structure and node parameters 2. Weights file (.pt) - PyTorch checkpoint with trained weights 3. Metadata - Optional name, description, tags, author

Notes: - Automatically creates weights path by replacing .yaml with .pt - Metadata is embedded in YAML config file - Use this for inference-only deployment (no training state) - For full reproducibility, use SaveTrainRun instead

See Also: - LoadPipeline + LoadPipelineWeights - Restore pipeline - SaveTrainRun - Save complete trainrun (includes data/training config)


SaveTrainRun

Purpose: Save complete trainrun configuration (pipeline + data + training configs).

Request:

message SaveTrainRunRequest {
  string session_id = 1;
  string trainrun_path = 2;  // Path for trainrun YAML
  bool save_weights = 3;     // Include weights (default: true)
}

Response:

message SaveTrainRunResponse {
  string trainrun_path = 1;  // Saved trainrun config
  string weights_path = 2;   // Saved weights (if save_weights=true)
}

Python Example:

# Save complete trainrun after training
response = stub.SaveTrainRun(
    cuvis_ai_pb2.SaveTrainRunRequest(
        session_id=session_id,
        trainrun_path="outputs/deep_svdd_run.yaml",
        save_weights=True,
    )
)

print(f"TrainRun saved to: {response.trainrun_path}")
print(f"Weights saved to: {response.weights_path}")

What Gets Saved: - Pipeline config - Complete pipeline structure - Data config - Data loading configuration (paths, train/val/test splits, batch size) - Training config - Optimizer, scheduler, trainer parameters - Weights (optional) - Trained model weights

TrainRun vs Pipeline: | Feature | SavePipeline | SaveTrainRun | |---------|--------------|--------------| | Pipeline config | ✅ | ✅ | | Weights | ✅ | ✅ | | Data config | ❌ | ✅ | | Training config | ❌ | ✅ | | Use for | Inference deployment | Reproducibility, resume training |

Notes: - Use SaveTrainRun for full reproducibility (can resume training later) - Use SavePipeline for inference-only deployment (smaller, no training overhead) - Trainrun can be restored with RestoreTrainRun

See Also: - RestoreTrainRun - Restore complete trainrun - SavePipeline - Save pipeline only


RestoreTrainRun

Purpose: Restore complete training run (pipeline + data + training + weights).

Request:

message RestoreTrainRunRequest {
  string trainrun_path = 1;  // Path to trainrun YAML
  string weights_path = 2;   // Optional custom weights path
  bool strict = 3;           // Strict weight loading (default: true)
}

Response:

message RestoreTrainRunResponse {
  string session_id = 1;  // NEW session created automatically
  bool success = 2;
}

Python Example:

# Restore trainrun (creates new session automatically)
response = stub.RestoreTrainRun(
    cuvis_ai_pb2.RestoreTrainRunRequest(
        trainrun_path="outputs/deep_svdd_run.yaml",
        weights_path="outputs/deep_svdd_run.pt",  # Optional override
        strict=True,
    )
)

session_id = response.session_id
print(f"TrainRun restored in session: {session_id}")

# Now ready for inference or continued training
inference_response = stub.Inference(
    cuvis_ai_pb2.InferenceRequest(session_id=session_id, inputs=...)
)

Key Feature: Automatic Session Creation - RestoreTrainRun creates a new session automatically - You don't need to call CreateSession first - Returns the new session_id in response - Session has pipeline + weights + configs fully loaded

Notes: - Most convenient way to restore trained models - If weights_path not specified, looks for .pt file next to .yaml - Use strict=False for partial weight loading - Remember to CloseSession when done

See Also: - SaveTrainRun - Save trainrun for restoration - Restore Pipeline Guide


Training Operations

Execute statistical and gradient-based training with streaming progress updates.

Train

Purpose: Execute training with streaming progress updates (statistical or gradient).

Request:

message TrainRequest {
  string session_id = 1;
  TrainerType trainer_type = 2;  // STATISTICAL or GRADIENT
}

enum TrainerType {
  TRAINER_TYPE_STATISTICAL = 0;
  TRAINER_TYPE_GRADIENT = 1;
}

Response (Server-Side Streaming):

message TrainResponse {
  ExecutionContext context = 1;      // Epoch, step, stage info
  TrainStatus status = 2;            // RUNNING, COMPLETED, FAILED
  map<string, float> losses = 3;     // Loss values
  map<string, float> metrics = 4;    // Metric values
  string message = 5;                // Progress message
}

Python Example - Statistical Training:

# Phase 1: Statistical initialization (short, no backprop)
for progress in stub.Train(
    cuvis_ai_pb2.TrainRequest(
        session_id=session_id,
        trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
    )
):
    stage = cuvis_ai_pb2.ExecutionStage.Name(progress.context.stage)
    status = cuvis_ai_pb2.TrainStatus.Name(progress.status)
    print(f"[{stage}] {status}: {progress.message}")

Python Example - Gradient Training:

# Phase 2: Gradient-based training (full backprop)
for progress in stub.Train(
    cuvis_ai_pb2.TrainRequest(
        session_id=session_id,
        trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
    )
):
    stage = cuvis_ai_pb2.ExecutionStage.Name(progress.context.stage)
    status = cuvis_ai_pb2.TrainStatus.Name(progress.status)

    if progress.losses:
        print(f"[{stage}] Epoch {progress.context.epoch} | losses={dict(progress.losses)}")
    if progress.metrics:
        print(f"[{stage}] Epoch {progress.context.epoch} | metrics={dict(progress.metrics)}")

Two-Phase Training Pattern:

# 1. Statistical initialization (RX, normalization stats, etc.)
for progress in stub.Train(
    cuvis_ai_pb2.TrainRequest(
        session_id=session_id,
        trainer_type=cuvis_ai_pb2.TRAINER_TYPE_STATISTICAL,
    )
):
    print(f"[statistical] {format_progress(progress)}")

# 2. Gradient-based fine-tuning (deep learning)
for progress in stub.Train(
    cuvis_ai_pb2.TrainRequest(
        session_id=session_id,
        trainer_type=cuvis_ai_pb2.TRAINER_TYPE_GRADIENT,
    )
):
    print(f"[gradient] {format_progress(progress)}")

Execution Stages: - EXECUTION_STAGE_TRAIN - Training loop - EXECUTION_STAGE_VALIDATE - Validation loop - EXECUTION_STAGE_TEST - Test loop

Train Status: - TRAIN_STATUS_RUNNING - Training in progress - TRAIN_STATUS_COMPLETED - Training finished successfully - TRAIN_STATUS_FAILED - Training failed with error

Helper Function:

from examples.grpc.workflow_utils import format_progress

for progress in stub.Train(...):
    print(format_progress(progress))
# Output: "[TRAIN] RUNNING | losses={'total': 0.42} | metrics={'iou': 0.85}"

Notes: - Training is server-side streaming (client receives updates as they occur) - No polling required (updates pushed in real-time) - Process stream with for-loop (blocks until training completes) - Statistical training is typically fast (1-2 passes over data) - Gradient training duration depends on max_epochs in training config

See Also: - GetTrainStatus - Query training status - Sequence Diagrams


GetTrainStatus

Purpose: Query current training status for a session.

Request:

message GetTrainStatusRequest {
  string session_id = 1;
}

Response:

message GetTrainStatusResponse {
  TrainStatus status = 1;  // Current status
  string message = 2;      // Status message
}

Python Example:

response = stub.GetTrainStatus(
    cuvis_ai_pb2.GetTrainStatusRequest(session_id=session_id)
)

status = cuvis_ai_pb2.TrainStatus.Name(response.status)
print(f"Training status: {status}")
if response.message:
    print(f"Message: {response.message}")

Use Cases: - Checking if training is complete before inference - Monitoring training from separate process - Debugging training issues

Notes: - Returns last known status (may be stale if training just started) - For real-time updates, use Train streaming instead - Status persists in session until next training call


Inference Operations

Run predictions on trained pipelines.

Inference

Purpose: Run predictions on trained pipeline with optional output filtering.

Request:

message InferenceRequest {
  string session_id = 1;
  InputBatch inputs = 2;
  repeated string output_specs = 3;  // Optional: filter outputs
}

Response:

message InferenceResponse {
  map<string, Tensor> outputs = 1;  // Output tensors by name
  map<string, float> metrics = 2;   // Optional metrics
}

Python Example - Basic Inference:

from cuvis_ai_core.grpc import helpers
import numpy as np

# Prepare input data
cube = np.random.rand(1, 32, 32, 61).astype(np.float32)
wavelengths = np.linspace(430, 910, 61).reshape(1, -1).astype(np.float32)

# Run inference
response = stub.Inference(
    cuvis_ai_pb2.InferenceRequest(
        session_id=session_id,
        inputs=cuvis_ai_pb2.InputBatch(
            cube=helpers.numpy_to_proto(cube),
            wavelengths=helpers.numpy_to_proto(wavelengths),
        ),
    )
)

# Process outputs
for name, tensor_proto in response.outputs.items():
    output_array = helpers.proto_to_numpy(tensor_proto)
    print(f"{name}: shape={output_array.shape}, dtype={output_array.dtype}")

Python Example - Output Filtering:

# Request only specific outputs (reduces payload size)
response = stub.Inference(
    cuvis_ai_pb2.InferenceRequest(
        session_id=session_id,
        inputs=cuvis_ai_pb2.InputBatch(
            cube=helpers.numpy_to_proto(cube),
            wavelengths=helpers.numpy_to_proto(wavelengths),
        ),
        output_specs=[
            "selector.selected",   # Only selected channels
            "detector.scores",     # Anomaly scores
            "decider.decisions",   # Final decisions
        ],
    )
)

selected = helpers.proto_to_numpy(response.outputs["selector.selected"])
scores = helpers.proto_to_numpy(response.outputs["detector.scores"])
decisions = helpers.proto_to_numpy(response.outputs["decider.decisions"])

Python Example - Complex Input Types:

# Inference with bounding boxes and points (e.g., SAM integration)
response = stub.Inference(
    cuvis_ai_pb2.InferenceRequest(
        session_id=session_id,
        inputs=cuvis_ai_pb2.InputBatch(
            cube=helpers.numpy_to_proto(cube),
            wavelengths=helpers.numpy_to_proto(wavelengths),
            bboxes=cuvis_ai_pb2.BoundingBoxes(
                boxes=[
                    cuvis_ai_pb2.BoundingBox(
                        element_id=0,
                        x_min=10, y_min=10,
                        x_max=20, y_max=20,
                    )
                ]
            ),
            points=cuvis_ai_pb2.Points(
                points=[
                    cuvis_ai_pb2.Point(
                        element_id=0,
                        x=15.5, y=15.5,
                        type=cuvis_ai_pb2.POINT_TYPE_POSITIVE,
                    )
                ]
            ),
            text_prompt="Find anomalies in lentils",
        ),
    )
)

Output Filtering Benefits: - Reduces network payload (important for large tensors) - Faster response time (server skips unused computations) - Lower memory usage on client - Use when you only need subset of pipeline outputs

Notes: - Pipeline must be loaded and trained (or weights loaded) first - InputBatch supports: cube, wavelengths, mask, bboxes, points, text_prompt - Output filtering is optional (omit output_specs to get all outputs) - Use helpers.numpy_to_proto() and helpers.proto_to_numpy() for conversions

See Also: - InputBatch Data Type - GetPipelineInputs - Query required inputs - GetPipelineOutputs - Query available outputs


Introspection & Discovery

Query pipeline capabilities, inspect structure, and visualize graphs.

GetPipelineInputs

Purpose: Get input tensor specifications for current pipeline.

Request:

message GetPipelineInputsRequest {
  string session_id = 1;
}

Response:

message GetPipelineInputsResponse {
  repeated string input_names = 1;             // Input port names
  map<string, TensorSpec> input_specs = 2;    // Specs by name
}

Python Example:

response = stub.GetPipelineInputs(
    cuvis_ai_pb2.GetPipelineInputsRequest(session_id=session_id)
)

print("Pipeline inputs:")
for name in response.input_names:
    spec = response.input_specs[name]
    shape = list(spec.shape)
    dtype = cuvis_ai_pb2.DType.Name(spec.dtype)
    required = "required" if spec.required else "optional"
    print(f"  {name}: shape={shape}, dtype={dtype}, {required}")

Example Output:

Pipeline inputs:
  cube: shape=[1, -1, -1, 61], dtype=D_TYPE_FLOAT32, required
  wavelengths: shape=[1, 61], dtype=D_TYPE_FLOAT32, required
  mask: shape=[1, -1, -1], dtype=D_TYPE_UINT8, optional

Notes: - Shape dimensions of -1 indicate dynamic sizes - required=true means input must be provided for inference - required=false means input is optional - Pipeline must be loaded first

See Also: - GetPipelineOutputs - TensorSpec Data Type


GetPipelineOutputs

Purpose: Get output tensor specifications for current pipeline.

Request:

message GetPipelineOutputsRequest {
  string session_id = 1;
}

Response:

message GetPipelineOutputsResponse {
  repeated string output_names = 1;            // Output port names
  map<string, TensorSpec> output_specs = 2;   // Specs by name
}

Python Example:

response = stub.GetPipelineOutputs(
    cuvis_ai_pb2.GetPipelineOutputsRequest(session_id=session_id)
)

print("Pipeline outputs:")
for name in response.output_names:
    spec = response.output_specs[name]
    shape = list(spec.shape)
    dtype = cuvis_ai_pb2.DType.Name(spec.dtype)
    print(f"  {name}: shape={shape}, dtype={dtype}")

Example Output:

Pipeline outputs:
  selector.selected: shape=[1, -1, -1, 4], dtype=D_TYPE_FLOAT32
  detector.scores: shape=[1, -1, -1], dtype=D_TYPE_FLOAT32
  decider.decisions: shape=[1, -1, -1], dtype=D_TYPE_UINT8

Use Cases: - Discovering available outputs before inference - Validating pipeline structure - Generating documentation - Dynamic UI generation


GetPipelineVisualization

Purpose: Render pipeline graph as image or text format.

Request:

message GetPipelineVisualizationRequest {
  string session_id = 1;
  string format = 2;  // "png", "svg", "dot", "mermaid"
}

Response:

message GetPipelineVisualizationResponse {
  bytes image_data = 1;  // Binary image data or text
  string format = 2;     // Confirmed format
}

Python Example - PNG:

from pathlib import Path

response = stub.GetPipelineVisualization(
    cuvis_ai_pb2.GetPipelineVisualizationRequest(
        session_id=session_id,
        format="png",
    )
)

Path("pipeline_graph.png").write_bytes(response.image_data)
print("Pipeline visualization saved to pipeline_graph.png")

Python Example - Mermaid:

response = stub.GetPipelineVisualization(
    cuvis_ai_pb2.GetPipelineVisualizationRequest(
        session_id=session_id,
        format="mermaid",
    )
)

mermaid_text = response.image_data.decode("utf-8")
print("Pipeline in Mermaid format:")
print(mermaid_text)

Supported Formats: - "png" - PNG image (binary) - "svg" - SVG image (text/XML) - "dot" - Graphviz DOT format (text) - "mermaid" - Mermaid diagram (text)

Use Cases: - Documentation generation - Debugging pipeline structure - Presenting architecture to stakeholders - Automated diagram generation in CI/CD


ListAvailablePipelines

Purpose: Discover registered pipelines available for loading.

Request:

message ListAvailablePipelinesRequest {
  string filter_tag = 1;  // Optional tag filter
}

Response:

message ListAvailablePipelinesResponse {
  repeated PipelineInfo pipelines = 1;
}

message PipelineInfo {
  string name = 1;
  PipelineMetadata metadata = 2;
  repeated string tags = 3;
}

Python Example:

response = stub.ListAvailablePipelines(
    cuvis_ai_pb2.ListAvailablePipelinesRequest(
        filter_tag="anomaly_detection",  # Optional filter
    )
)

print("Available pipelines:")
for pipeline in response.pipelines:
    print(f"  - {pipeline.name}")
    print(f"    Description: {pipeline.metadata.description}")
    print(f"    Tags: {', '.join(pipeline.tags)}")

Use Cases: - Pipeline discovery for users - Dynamic pipeline selection in applications - Catalog generation


GetPipelineInfo

Purpose: Get detailed metadata for a specific pipeline.

Request:

message GetPipelineInfoRequest {
  string pipeline_name = 1;
}

Response:

message GetPipelineInfoResponse {
  PipelineInfo info = 1;
  map<string, TensorSpec> required_inputs = 2;
  map<string, TensorSpec> outputs = 3;
}

Python Example:

response = stub.GetPipelineInfo(
    cuvis_ai_pb2.GetPipelineInfoRequest(
        pipeline_name="deep_svdd_anomaly"
    )
)

print(f"Pipeline: {response.info.name}")
print(f"Description: {response.info.metadata.description}")
print(f"Required inputs: {list(response.required_inputs.keys())}")
print(f"Outputs: {list(response.outputs.keys())}")


Error Handling

All gRPC RPCs use standard gRPC status codes for error handling.

Status Codes

import grpc

try:
    response = stub.Inference(request)
except grpc.RpcError as e:
    code = e.code()
    details = e.details()

    if code == grpc.StatusCode.INVALID_ARGUMENT:
        print(f"Invalid request: {details}")
    elif code == grpc.StatusCode.NOT_FOUND:
        print(f"Resource not found: {details}")
    elif code == grpc.StatusCode.FAILED_PRECONDITION:
        print(f"Operation not allowed: {details}")
    elif code == grpc.StatusCode.RESOURCE_EXHAUSTED:
        print(f"Resource exhausted: {details}")
    elif code == grpc.StatusCode.INTERNAL:
        print(f"Internal server error: {details}")
    else:
        print(f"gRPC error: {code} - {details}")
        raise

Common Error Codes

Code Meaning Common Causes
INVALID_ARGUMENT Malformed request Missing required fields, invalid types
NOT_FOUND Resource not found Session ID doesn't exist, expired session
FAILED_PRECONDITION Operation not allowed Training before config set, inference before weights loaded
RESOURCE_EXHAUSTED Resource limit exceeded Message size exceeded, GPU out of memory
INTERNAL Server error Unexpected exception, configuration bug
UNAVAILABLE Service unavailable Server down, network issue
DEADLINE_EXCEEDED Operation timeout Long training, slow network

Error Examples

Session Not Found:

grpc.StatusCode.NOT_FOUND
"Session '7f3e4d2c-1a9b-4c8d-9e7f-2b5a6c8d9e0f' not found or expired"

Message Size Exceeded:

grpc.StatusCode.RESOURCE_EXHAUSTED
"Received message larger than max (100000000 vs. 4194304)"
Solution: Increase client/server message size limits

Missing Config:

grpc.StatusCode.FAILED_PRECONDITION
"Cannot train: trainrun config not set. Call SetTrainRunConfig first."

CUDA Out of Memory:

grpc.StatusCode.RESOURCE_EXHAUSTED
"CUDA out of memory. Tried to allocate 2.00 GiB"
Solution: Reduce batch size, close unused sessions


Data Types Reference

InputBatch

Complete input specification for inference.

message InputBatch {
  Tensor cube = 1;              // Required: [B, H, W, C] hyperspectral cube
  Tensor wavelengths = 2;       // Optional: [B, C] wavelengths in nm
  Tensor mask = 3;              // Optional: [B, H, W] binary mask
  BoundingBoxes bboxes = 4;     // Optional: Bounding boxes
  Points points = 5;            // Optional: Point prompts
  string text_prompt = 6;       // Optional: Text description
  map<string, Tensor> extra_inputs = 7;  // Optional: Additional inputs
}

Example:

inputs = cuvis_ai_pb2.InputBatch(
    cube=helpers.numpy_to_proto(cube),                    # Required
    wavelengths=helpers.numpy_to_proto(wavelengths),      # Optional
    mask=helpers.numpy_to_proto(mask),                    # Optional
    bboxes=cuvis_ai_pb2.BoundingBoxes(boxes=[...]),       # Optional
    points=cuvis_ai_pb2.Points(points=[...]),             # Optional
    text_prompt="Find anomalies",                         # Optional
)


Tensor

Protocol Buffer tensor representation.

message Tensor {
  repeated int64 shape = 1;  // Tensor dimensions
  DType dtype = 2;           // Data type
  bytes raw_data = 3;        // Raw binary data
}

enum DType {
  D_TYPE_FLOAT32 = 0;
  D_TYPE_FLOAT64 = 1;
  D_TYPE_INT32 = 2;
  D_TYPE_INT64 = 3;
  D_TYPE_UINT8 = 4;
  D_TYPE_UINT16 = 5;
  // ...
}

Conversion Helpers:

from cuvis_ai_core.grpc import helpers

# NumPy array to Tensor
array = np.random.rand(1, 32, 32, 61).astype(np.float32)
tensor_proto = helpers.numpy_to_proto(array)

# Tensor to NumPy array
array_recovered = helpers.proto_to_numpy(tensor_proto)

# PyTorch tensor to Tensor
tensor = torch.randn(1, 32, 32, 61)
tensor_proto = helpers.tensor_to_proto(tensor)


TensorSpec

Tensor specification (metadata without data).

message TensorSpec {
  string name = 1;           // Tensor name
  repeated int64 shape = 2;  // Shape (-1 for dynamic dimensions)
  DType dtype = 3;           // Data type
  bool required = 4;         // Required for inference
}

Example:

name: "cube"
shape: [1, -1, -1, 61]  // Batch=1, Height/Width dynamic, Channels=61
dtype: D_TYPE_FLOAT32
required: true


BoundingBoxes

Bounding box collection for object detection or SAM integration.

message BoundingBoxes {
  repeated BoundingBox boxes = 1;
}

message BoundingBox {
  int32 element_id = 1;  // Batch element index
  float x_min = 2;
  float y_min = 3;
  float x_max = 4;
  float y_max = 5;
}

Example:

bboxes = cuvis_ai_pb2.BoundingBoxes(
    boxes=[
        cuvis_ai_pb2.BoundingBox(
            element_id=0,
            x_min=10.0, y_min=10.0,
            x_max=20.0, y_max=20.0,
        ),
        cuvis_ai_pb2.BoundingBox(
            element_id=0,
            x_min=30.0, y_min=30.0,
            x_max=40.0, y_max=40.0,
        ),
    ]
)


Points

Point prompts for interactive segmentation (SAM-style).

message Points {
  repeated Point points = 1;
}

message Point {
  int32 element_id = 1;   // Batch element index
  float x = 2;
  float y = 3;
  PointType type = 4;     // POSITIVE or NEGATIVE
}

enum PointType {
  POINT_TYPE_POSITIVE = 0;
  POINT_TYPE_NEGATIVE = 1;
}

Example:

points = cuvis_ai_pb2.Points(
    points=[
        cuvis_ai_pb2.Point(
            element_id=0,
            x=15.5, y=15.5,
            type=cuvis_ai_pb2.POINT_TYPE_POSITIVE,
        ),
        cuvis_ai_pb2.Point(
            element_id=0,
            x=25.5, y=25.5,
            type=cuvis_ai_pb2.POINT_TYPE_NEGATIVE,
        ),
    ]
)


See Also

Tutorials & Guides

Configuration

Deployment